Ticket #4544: middle-pgsql.c

File middle-pgsql.c, 52.5 KB (added by chris@…, 7 years ago)
Line 
1/* Implements the mid-layer processing for osm2pgsql
2 * using several PostgreSQL tables
3 *
4 * This layer stores data read in from the planet.osm file
5 * and is then read by the backend processing code to
6 * emit the final geometry-enabled output formats
7*/
8
9#include "config.h"
10
11#include <stdio.h>
12#include <unistd.h>
13#include <stdlib.h>
14#include <string.h>
15#include <assert.h>
16#include <math.h>
17#include <time.h>
18
19#ifdef HAVE_PTHREAD
20#include <pthread.h>
21#endif
22#ifdef HAVE_SYS_WAIT_H
23#include <sys/wait.h>
24#endif
25#ifdef HAVE_MMAP
26#include <sys/mman.h>
27#endif
28
29#ifndef  MAP_ANONYMOUS
30#ifdef MAP_ANON
31#define MAP_ANONYMOUS MAP_ANON
32#endif
33#endif
34
35#include <libpq-fe.h>
36
37#include "osmtypes.h"
38#include "middle.h"
39#include "middle-pgsql.h"
40#include "output-pgsql.h"
41#include "node-ram-cache.h"
42#include "node-persistent-cache.h"
43#include "pgsql.h"
44
45static int scale = 100;
46
47struct progress_info {
48  time_t start;
49  time_t end;
50  int count;
51  int finished;
52};
53
54enum table_id {
55    t_node, t_way, t_rel
56} ;
57
58struct table_desc {
59    //enum table_id table;
60    const char *name;
61    const char *start;
62    const char *create;
63    const char *create_index;
64    const char *prepare;
65    const char *prepare_intarray;
66    const char *copy;
67    const char *analyze;
68    const char *stop;
69    const char *array_indexes;
70
71    int copyMode;    /* True if we are in copy mode */
72    int transactionMode;    /* True if we are in an extended transaction */
73    PGconn *sql_conn;
74};
75
76static struct table_desc tables [] = {
77    {
78        //table = t_node,
79         .name = "%p_nodes",
80        .start = "BEGIN;\n",
81#ifdef FIXED_POINT
82       .create = "CREATE %m TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat int4 not null, lon int4 not null, tags text[]) {TABLESPACE %t};\n",
83      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", int4, int4, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
84#else
85       .create = "CREATE %m TABLE %p_nodes (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, lat double precision not null, lon double precision not null, tags text[]) {TABLESPACE %t};\n",
86      .prepare = "PREPARE insert_node (" POSTGRES_OSMID_TYPE ", double precision, double precision, text[]) AS INSERT INTO %p_nodes VALUES ($1,$2,$3,$4);\n"
87#endif
88               "PREPARE get_node (" POSTGRES_OSMID_TYPE ") AS SELECT lat,lon,tags FROM %p_nodes WHERE id = $1 LIMIT 1;\n"
89               "PREPARE delete_node (" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_nodes WHERE id = $1;\n",
90.prepare_intarray = // This is to fetch lots of nodes simultaneously, in order including duplicates. The commented out version doesn't do duplicates
91                  // It's not optimal as it does a Nested Loop / Index Scan which is suboptimal for large arrays
92                  //"PREPARE get_node_list(int[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::int4[]) ORDER BY $1::int4[] # id\n",
93                 //"PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS select y.id, y.lat, y.lon from (select i, ($1)[i] as l_id from (select generate_series(1,array_length($1, 1)) as i) x) z, "
94                 //                                    "(select * from %p_nodes where id = ANY($1)) y where l_id=id order by i;\n",
95                 "PREPARE get_node_list(" POSTGRES_OSMID_TYPE "[]) AS SELECT id, lat, lon FROM %p_nodes WHERE id = ANY($1::" POSTGRES_OSMID_TYPE "[])",
96         .copy = "COPY %p_nodes FROM STDIN;\n",
97      .analyze = "ANALYZE %p_nodes;\n",
98         .stop = "COMMIT;\n"
99    },
100    {
101        //table = t_way,
102         .name = "%p_ways",
103        .start = "BEGIN;\n",
104       .create = "CREATE %m TABLE %p_ways (id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, nodes " POSTGRES_OSMID_TYPE "[] not null, tags text[], pending boolean not null) {TABLESPACE %t};\n",
105 .create_index = "CREATE INDEX %p_ways_idx ON %p_ways (id) {TABLESPACE %i} WHERE pending;\n",
106.array_indexes = "CREATE INDEX %p_ways_nodes ON %p_ways USING gin (nodes) {TABLESPACE %i};\n",
107      .prepare = "PREPARE insert_way (" POSTGRES_OSMID_TYPE ", " POSTGRES_OSMID_TYPE "[], text[], boolean) AS INSERT INTO %p_ways VALUES ($1,$2,$3,$4);\n"
108               "PREPARE get_way (" POSTGRES_OSMID_TYPE ") AS SELECT nodes, tags, array_upper(nodes,1) FROM %p_ways WHERE id = $1;\n"
109               "PREPARE way_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = false WHERE id = $1;\n"
110               "PREPARE pending_ways AS SELECT id FROM %p_ways WHERE pending;\n"
111               "PREPARE delete_way(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_ways WHERE id = $1;\n",
112.prepare_intarray = "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_ways SET pending = true WHERE nodes && ARRAY[$1] AND NOT pending;\n",
113         .copy = "COPY %p_ways FROM STDIN;\n",
114      .analyze = "ANALYZE %p_ways;\n",
115         .stop =  "COMMIT;\n"
116    },
117    {
118        //table = t_rel,
119         .name = "%p_rels",
120        .start = "BEGIN;\n",
121       .create = "CREATE %m TABLE %p_rels(id " POSTGRES_OSMID_TYPE " PRIMARY KEY {USING INDEX TABLESPACE %i}, way_off int2, rel_off int2, parts " POSTGRES_OSMID_TYPE "[], members text[], tags text[], pending boolean not null) {TABLESPACE %t};\n",
122 .create_index = "CREATE INDEX %p_rels_idx ON %p_rels (id) {TABLESPACE %i} WHERE pending;\n",
123.array_indexes = "CREATE INDEX %p_rels_parts ON %p_rels USING gin (parts) {TABLESPACE %i};\n",
124      .prepare = "PREPARE insert_rel (" POSTGRES_OSMID_TYPE ", int2, int2, " POSTGRES_OSMID_TYPE "[], text[], text[]) AS INSERT INTO %p_rels VALUES ($1,$2,$3,$4,$5,$6,false);\n"
125               "PREPARE get_rel (" POSTGRES_OSMID_TYPE ") AS SELECT members, tags, array_upper(members,1)/2 FROM %p_rels WHERE id = $1;\n"
126               "PREPARE rel_done(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = false WHERE id = $1;\n"
127               "PREPARE pending_rels AS SELECT id FROM %p_rels WHERE pending;\n"
128               "PREPARE delete_rel(" POSTGRES_OSMID_TYPE ") AS DELETE FROM %p_rels WHERE id = $1;\n",
129.prepare_intarray =
130                "PREPARE node_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[1:way_off] && ARRAY[$1] AND NOT pending;\n"
131                "PREPARE way_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[way_off+1:rel_off] && ARRAY[$1] AND NOT pending;\n"
132                "PREPARE rel_changed_mark(" POSTGRES_OSMID_TYPE ") AS UPDATE %p_rels SET pending = true WHERE parts && ARRAY[$1] AND parts[rel_off+1:array_length(parts,1)] && ARRAY[$1] AND NOT pending;\n",
133         .copy = "COPY %p_rels FROM STDIN;\n",
134      .analyze = "ANALYZE %p_rels;\n",
135         .stop =  "COMMIT;\n"
136    }
137};
138
139static const int num_tables = sizeof(tables)/sizeof(tables[0]);
140static int warn_node_order;
141static struct table_desc *node_table = &tables[t_node];
142static struct table_desc *way_table  = &tables[t_way];
143static struct table_desc *rel_table  = &tables[t_rel];
144
145static int Append;
146
147const struct output_options *out_options;
148
149static int pgsql_connect(const struct output_options *options) {
150    int i;
151    /* We use a connection per table to enable the use of COPY */
152    for (i=0; i<num_tables; i++) {
153        PGconn *sql_conn;
154        sql_conn = PQconnectdb(options->conninfo);
155
156        /* Check to see that the backend connection was successfully made */
157        if (PQstatus(sql_conn) != CONNECTION_OK) {
158            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
159            exit_nicely();
160        }
161        tables[i].sql_conn = sql_conn;
162
163        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
164
165        if (tables[i].prepare) {
166            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
167        }
168
169        if (tables[i].prepare_intarray) {
170            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
171        }
172
173    }
174    return 0;
175}
176
177static void pgsql_cleanup(void)
178{
179    int i;
180
181    for (i=0; i<num_tables; i++) {
182        if (tables[i].sql_conn) {
183            PQfinish(tables[i].sql_conn);
184            tables[i].sql_conn = NULL;
185        }
186    }
187}
188
189char *pgsql_store_nodes(osmid_t *nds, int nd_count)
190{
191  static char *buffer;
192  static int buflen;
193
194  char *ptr;
195  int i, first;
196
197  if( buflen <= nd_count * 10 )
198  {
199    buflen = ((nd_count * 10) | 4095) + 1;  /* Round up to next page */
200    buffer = realloc( buffer, buflen );
201  }
202_restart:
203
204  ptr = buffer;
205  first = 1;
206  *ptr++ = '{';
207  for( i=0; i<nd_count; i++ )
208  {
209    if( !first ) *ptr++ = ',';
210    ptr += sprintf(ptr, "%" PRIdOSMID, nds[i] );
211
212    if( (ptr-buffer) > (buflen-20) ) /* Almost overflowed? */
213    {
214      buflen <<= 1;
215      buffer = realloc( buffer, buflen );
216
217      goto _restart;
218    }
219    first = 0;
220  }
221
222  *ptr++ = '}';
223  *ptr++ = 0;
224
225  return buffer;
226}
227
228/* Special escape routine for escaping strings in array constants: double quote, backslash,newline, tab*/
229static inline char *escape_tag( char *ptr, const char *in, int escape )
230{
231  while( *in )
232  {
233    switch(*in)
234    {
235      case '"':
236        if( escape ) *ptr++ = '\\';
237        *ptr++ = '\\';
238        *ptr++ = '"';
239        break;
240      case '\\':
241        if( escape ) *ptr++ = '\\';
242        if( escape ) *ptr++ = '\\';
243        *ptr++ = '\\';
244        *ptr++ = '\\';
245        break;
246      case '\n':
247        if( escape ) *ptr++ = '\\';
248        *ptr++ = '\\';
249        *ptr++ = 'n';
250        break;
251      case '\r':
252        if( escape ) *ptr++ = '\\';
253        *ptr++ = '\\';
254        *ptr++ = 'r';
255        break;
256      case '\t':
257        if( escape ) *ptr++ = '\\';
258        *ptr++ = '\\';
259        *ptr++ = 't';
260        break;
261      default:
262        *ptr++ = *in;
263        break;
264    }
265    in++;
266  }
267  return ptr;
268}
269
270/* escape means we return '\N' for copy mode, otherwise we return just NULL */
271char *pgsql_store_tags(struct keyval *tags, int escape)
272{
273  static char *buffer;
274  static int buflen;
275
276  char *ptr;
277  struct keyval *i;
278  int first;
279
280  int countlist = countList(tags);
281  if( countlist == 0 )
282  {
283    if( escape )
284      return "\\N";
285    else
286      return NULL;
287  }
288
289  if( buflen <= countlist * 24 ) /* LE so 0 always matches */
290  {
291    buflen = ((countlist * 24) | 4095) + 1;  /* Round up to next page */
292    buffer = realloc( buffer, buflen );
293  }
294_restart:
295
296  ptr = buffer;
297  first = 1;
298  *ptr++ = '{';
299  /* The lists are circular, exit when we reach the head again */
300  for( i=tags->next; i->key; i = i->next )
301  {
302    int maxlen = (strlen(i->key) + strlen(i->value)) * 4;
303    if( (ptr+maxlen-buffer) > (buflen-20) ) /* Almost overflowed? */
304    {
305      buflen <<= 1;
306      buffer = realloc( buffer, buflen );
307
308      goto _restart;
309    }
310    if( !first ) *ptr++ = ',';
311    *ptr++ = '"';
312    ptr = escape_tag( ptr, i->key, escape );
313    *ptr++ = '"';
314    *ptr++ = ',';
315    *ptr++ = '"';
316    ptr = escape_tag( ptr, i->value, escape );
317    *ptr++ = '"';
318
319    first=0;
320  }
321
322  *ptr++ = '}';
323  *ptr++ = 0;
324
325  return buffer;
326}
327
328/* Decodes a portion of an array literal from postgres */
329/* Argument should point to beginning of literal, on return points to delimiter */
330static const char *decode_upto( const char *src, char *dst )
331{
332  int quoted = (*src == '"');
333  if( quoted ) src++;
334
335  while( quoted ? (*src != '"') : (*src != ',' && *src != '}') )
336  {
337    if( *src == '\\' )
338    {
339      switch( src[1] )
340      {
341        case 'n': *dst++ = '\n'; break;
342        case 't': *dst++ = '\t'; break;
343        default: *dst++ = src[1]; break;
344      }
345      src+=2;
346    }
347    else
348      *dst++ = *src++;
349  }
350  if( quoted ) src++;
351  *dst = 0;
352  return src;
353}
354
355static void pgsql_parse_tags( const char *string, struct keyval *tags )
356{
357  char key[1024];
358  char val[1024];
359
360  if( *string == '\0' )
361    return;
362
363//  fprintf( stderr, "Parsing: %s\n", string );
364  if( *string++ != '{' )
365    return;
366  while( *string != '}' )
367  {
368    string = decode_upto( string, key );
369    /* String points to the comma */
370    string++;
371    string = decode_upto( string, val );
372    /* String points to the comma or closing '}' */
373    addItem( tags, key, val, 0 );
374//    fprintf( stderr, "Extracted item: %s=%s\n", key, val );
375    if( *string == ',' )
376      string++;
377  }
378}
379
380/* Parses an array of integers */
381static void pgsql_parse_nodes(const char *src, osmid_t *nds, int nd_count )
382{
383  int count = 0;
384  const char *string = src;
385
386  if( *string++ != '{' )
387    return;
388  while( *string != '}' )
389  {
390    char *ptr;
391    nds[count] = strtoosmid( string, &ptr, 10 );
392    string = ptr;
393    if( *string == ',' )
394      string++;
395    count++;
396  }
397  if( count != nd_count )
398  {
399    fprintf( stderr, "parse_nodes problem: '%s' expected %d got %d\n", src, nd_count, count );
400    exit_nicely();
401  }
402}
403
404static int pgsql_endCopy( struct table_desc *table)
405{
406    /* Terminate any pending COPY */
407     if (table->copyMode) {
408        PGconn *sql_conn = table->sql_conn;
409        int stop = PQputCopyEnd(sql_conn, NULL);
410        if (stop != 1) {
411            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
412            exit_nicely();
413        }
414
415        PGresult *res = PQgetResult(sql_conn);
416        if (PQresultStatus(res) != PGRES_COMMAND_OK) {
417            fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
418            PQclear(res);
419            exit_nicely();
420        }
421        PQclear(res);
422        table->copyMode = 0;
423    }
424    return 0;
425}
426
427static int pgsql_nodes_set(osmid_t id, double lat, double lon, struct keyval *tags)
428{
429    /* Four params: id, lat, lon, tags */
430    char *paramValues[4];
431    char *buffer;
432
433    if( node_table->copyMode )
434    {
435      char *tag_buf = pgsql_store_tags(tags,1);
436      int length = strlen(tag_buf) + 64;
437      buffer = alloca( length );
438#ifdef FIXED_POINT
439      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\n", id, DOUBLE_TO_FIX(lat), DOUBLE_TO_FIX(lon), tag_buf ) > (length-10) )
440      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
441#else
442      if( snprintf( buffer, length, "%" PRIdOSMID "\t%.10f\t%.10f\t%s\n", id, lat, lon, tag_buf ) > (length-10) )
443      { fprintf( stderr, "buffer overflow node id %" PRIdOSMID "\n", id); return 1; }
444#endif
445      return pgsql_CopyData(__FUNCTION__, node_table->sql_conn, buffer);
446    }
447    buffer = alloca(64);
448    paramValues[0] = buffer;
449    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
450#ifdef FIXED_POINT
451    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", DOUBLE_TO_FIX(lat) ) + 1;
452    sprintf( paramValues[2], "%d", DOUBLE_TO_FIX(lon) );
453#else
454    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%.10f", lat ) + 1;
455    sprintf( paramValues[2], "%.10f", lon );
456#endif
457    paramValues[3] = pgsql_store_tags(tags,0);
458    pgsql_execPrepared(node_table->sql_conn, "insert_node", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
459    return 0;
460}
461
462static int middle_nodes_set(osmid_t id, double lat, double lon, struct keyval *tags) {
463    ram_cache_nodes_set( id, lat, lon, tags );
464
465    return (out_options->flat_node_cache_enabled) ? persistent_cache_nodes_set(id, lat, lon) : pgsql_nodes_set(id, lat, lon, tags);
466}
467
468static int pgsql_nodes_get(struct osmNode *out, osmid_t id)
469{
470    PGresult   *res;
471    char tmp[16];
472    char const *paramValues[1];
473    PGconn *sql_conn = node_table->sql_conn;
474
475    /* Make sure we're out of copy mode */
476    pgsql_endCopy( node_table );
477
478    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
479    paramValues[0] = tmp;
480
481    res = pgsql_execPrepared(sql_conn, "get_node", 1, paramValues, PGRES_TUPLES_OK);
482
483    if (PQntuples(res) != 1) {
484        PQclear(res);
485        return 1;
486    }
487
488#ifdef FIXED_POINT
489    out->lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 0), NULL, 10));
490    out->lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, 0, 1), NULL, 10));
491#else
492    out->lat = strtod(PQgetvalue(res, 0, 0), NULL);
493    out->lon = strtod(PQgetvalue(res, 0, 1), NULL);
494#endif
495    PQclear(res);
496    return 0;
497}
498
499static int middle_nodes_get(struct osmNode *out, osmid_t id)
500{
501    /* Check cache first */
502    if( ram_cache_nodes_get( out, id ) == 0 )
503        return 0;
504
505    return (out_options->flat_node_cache_enabled) ? persistent_cache_nodes_get(out, id) : pgsql_nodes_get(out, id);
506}
507
508/* This should be made more efficient by using an IN(ARRAY[]) construct */
509static int pgsql_nodes_get_list(struct osmNode *nodes, osmid_t *ndids, int nd_count)
510{
511    char tmp[16];
512    char *tmp2;
513    int count, count2, countDB, countPG, i,j;
514    osmid_t id;
515    osmid_t *ndidspg;
516    struct osmNode *nodespg;
517    struct osmNode *nodes2;
518    char const *paramValues[1];
519
520    PGresult *res;
521    PGconn *sql_conn = node_table->sql_conn;
522
523    count = 0; countDB = 0;
524
525    tmp2 = malloc(sizeof(char)*nd_count*16);
526    if (tmp2 == NULL) return 0; /*failed to allocate memory, return */
527
528    /* create a list of ids in tmp2 to query the database  */
529    snprintf(tmp2, sizeof(tmp2), "{");
530    for( i=0; i<nd_count; i++ ) {
531        /* Check cache first */
532        if( ram_cache_nodes_get( &nodes[i], ndids[i]) == 0 ) {
533            count++;
534            continue;
535        }
536        countDB++;
537        /* Mark nodes as needing to be fetched from the DB */
538        nodes[i].lat = NAN;
539        nodes[i].lon = NAN;
540        snprintf(tmp, sizeof(tmp), "%" PRIdOSMID ",", ndids[i]);
541        strcat(tmp2,tmp);
542    }
543    tmp2[strlen(tmp2) - 1] = '}'; /* replace last , with } to complete list of ids*/
544
545    if (countDB == 0) {
546        free(tmp2);
547        return count; /* All ids where in cache, so nothing more to do */
548    }
549
550    pgsql_endCopy(node_table);
551
552    paramValues[0] = tmp2;
553    res = pgsql_execPrepared(sql_conn, "get_node_list", 1, paramValues, PGRES_TUPLES_OK);
554    countPG = PQntuples(res);
555
556    ndidspg = malloc(sizeof(osmid_t)*countPG);
557    nodespg = malloc(sizeof(struct osmNode)*countPG);
558
559    if ((ndidspg == NULL) || (nodespg == NULL)) {
560        free(tmp2);
561        free(ndidspg);
562        free(nodespg);
563        PQclear(res);
564        return 0;
565    }
566
567    for (i = 0; i < countPG; i++) {
568        ndidspg[i] = strtoosmid(PQgetvalue(res, i, 0), NULL, 10);
569#ifdef FIXED_POINT
570        nodespg[i].lat = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 1), NULL, 10));
571        nodespg[i].lon = FIX_TO_DOUBLE(strtol(PQgetvalue(res, i, 2), NULL, 10));
572#else
573        nodespg[i].lat = strtod(PQgetvalue(res, i, 1), NULL);
574        nodespg[i].lon = strtod(PQgetvalue(res, i, 2), NULL);
575#endif
576    }
577
578
579    /* The list of results coming back from the db is in a different order to the list of nodes in the way.
580       Match the results back to the way node list */
581
582    for (i=0; i<nd_count; i++ ) {
583        if ((isnan(nodes[i].lat)) || (isnan(nodes[i].lon))) {
584            /* TODO: implement an O(log(n)) algorithm to match node ids */
585            for (j = 0; j < countPG; j++) {
586                if (ndidspg[j] == ndids[i]) {
587                    nodes[i].lat = nodespg[j].lat;
588                    nodes[i].lon = nodespg[j].lon;
589                    count++;
590                    break;
591                }
592            }
593        }
594    }
595
596    /* If some of the nodes in the way don't exist, the returning list has holes.
597       As the rest of the code expects a continuous list, it needs to be re-compacted */
598    if (count != nd_count) {
599        j = 0;
600        for (i = 0; i < nd_count; i++) {
601            if ( !isnan(nodes[i].lat)) {
602                nodes[j].lat = nodes[i].lat;
603                nodes[j].lon = nodes[i].lon;
604                j++;
605            }
606         }
607    }
608
609    PQclear(res);
610    free(tmp2);
611    free(ndidspg);
612    free(nodespg);
613
614    return count;
615}
616
617static int middle_nodes_get_list(struct osmNode *nodes, osmid_t *ndids, int nd_count)
618{
619    return (out_options->flat_node_cache_enabled) ? persistent_cache_nodes_get_list(nodes, ndids, nd_count) : pgsql_nodes_get_list(nodes, ndids, nd_count);
620}
621
622static int pgsql_nodes_delete(osmid_t osm_id)
623{
624    char const *paramValues[1];
625    char buffer[64];
626    /* Make sure we're out of copy mode */
627    pgsql_endCopy( node_table );
628
629    sprintf( buffer, "%" PRIdOSMID, osm_id );
630    paramValues[0] = buffer;
631    pgsql_execPrepared(node_table->sql_conn, "delete_node", 1, paramValues, PGRES_COMMAND_OK );
632    return 0;
633}
634
635static int middle_nodes_delete(osmid_t osm_id)
636{
637    return ((out_options->flat_node_cache_enabled) ? persistent_cache_nodes_set(osm_id, NAN, NAN) : pgsql_nodes_delete(osm_id));
638}
639
640static int pgsql_node_changed(osmid_t osm_id)
641{
642    char const *paramValues[1];
643    char buffer[64];
644    /* Make sure we're out of copy mode */
645    pgsql_endCopy( way_table );
646    pgsql_endCopy( rel_table );
647
648    sprintf( buffer, "%" PRIdOSMID, osm_id );
649    paramValues[0] = buffer;
650    pgsql_execPrepared(way_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
651    pgsql_execPrepared(rel_table->sql_conn, "node_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
652    return 0;
653}
654
655static int pgsql_ways_set(osmid_t way_id, osmid_t *nds, int nd_count, struct keyval *tags, int pending)
656{
657    /* Three params: id, nodes, tags, pending */
658    char *paramValues[4];
659    char *buffer;
660
661    if( way_table->copyMode )
662    {
663      char *tag_buf = pgsql_store_tags(tags,1);
664      char *node_buf = pgsql_store_nodes(nds, nd_count);
665      int length = strlen(tag_buf) + strlen(node_buf) + 64;
666      buffer = alloca(length);
667      if( snprintf( buffer, length, "%" PRIdOSMID "\t%s\t%s\t%c\n",
668              way_id, node_buf, tag_buf, pending?'t':'f' ) > (length-10) )
669      { fprintf( stderr, "buffer overflow way id %" PRIdOSMID "\n", way_id); return 1; }
670      return pgsql_CopyData(__FUNCTION__, way_table->sql_conn, buffer);
671    }
672    buffer = alloca(64);
673    paramValues[0] = buffer;
674    paramValues[3] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, way_id ) + 1;
675    sprintf( paramValues[3], "%c", pending?'t':'f' );
676    paramValues[1] = pgsql_store_nodes(nds, nd_count);
677    paramValues[2] = pgsql_store_tags(tags,0);
678    pgsql_execPrepared(way_table->sql_conn, "insert_way", 4, (const char * const *)paramValues, PGRES_COMMAND_OK);
679    return 0;
680}
681
682/* Caller is responsible for freeing nodesptr & resetList(tags) */
683static int pgsql_ways_get(osmid_t id, struct keyval *tags, struct osmNode **nodes_ptr, int *count_ptr)
684{
685    PGresult   *res;
686    char tmp[16];
687    char const *paramValues[1];
688    PGconn *sql_conn = way_table->sql_conn;
689
690    /* Make sure we're out of copy mode */
691    pgsql_endCopy( way_table );
692
693    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
694    paramValues[0] = tmp;
695
696    res = pgsql_execPrepared(sql_conn, "get_way", 1, paramValues, PGRES_TUPLES_OK);
697
698    if (PQntuples(res) != 1) {
699        PQclear(res);
700        return 1;
701    }
702
703    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
704
705    int num_nodes = strtol(PQgetvalue(res, 0, 2), NULL, 10);
706    osmid_t *list = alloca(sizeof(osmid_t)*num_nodes );
707    *nodes_ptr = malloc(sizeof(struct osmNode) * num_nodes);
708    pgsql_parse_nodes( PQgetvalue(res, 0, 0), list, num_nodes);
709
710    *count_ptr = out_options->flat_node_cache_enabled ?
711                persistent_cache_nodes_get_list(*nodes_ptr, list, num_nodes) :
712                pgsql_nodes_get_list( *nodes_ptr, list, num_nodes);
713    PQclear(res);
714    return 0;
715}
716
717static int pgsql_ways_done(osmid_t id)
718{
719    char tmp[16];
720    char const *paramValues[1];
721    PGconn *sql_conn = way_table->sql_conn;
722
723    /* Make sure we're out of copy mode */
724    pgsql_endCopy( way_table );
725
726    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
727    paramValues[0] = tmp;
728
729    pgsql_execPrepared(sql_conn, "way_done", 1, paramValues, PGRES_COMMAND_OK);
730
731    return 0;
732}
733
734static int pgsql_ways_delete(osmid_t osm_id)
735{
736    char const *paramValues[1];
737    char buffer[64];
738    /* Make sure we're out of copy mode */
739    pgsql_endCopy( way_table );
740
741    sprintf( buffer, "%" PRIdOSMID, osm_id );
742    paramValues[0] = buffer;
743    pgsql_execPrepared(way_table->sql_conn, "delete_way", 1, paramValues, PGRES_COMMAND_OK );
744    return 0;
745}
746
747static void pgsql_iterate_ways(int (*callback)(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count, int exists))
748{
749    int noProcs = out_options->num_procs;
750    int pid = 0;
751    PGresult   *res_ways;
752    int i, p, count = 0;
753    /* The flag we pass to indicate that the way in question might exist already in the database */
754    int exists = Append;
755
756    time_t start, end;
757    time(&start);
758#if HAVE_MMAP
759    struct progress_info *info = 0;
760    if(noProcs > 1)
761        info = mmap(0, sizeof(struct progress_info)*noProcs, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
762#endif
763    fprintf(stderr, "\nGoing over pending ways...\n");
764
765    /* Make sure we're out of copy mode */
766    pgsql_endCopy( way_table );
767
768    if (out_options->flat_node_cache_enabled) shutdown_node_persistent_cache();
769
770    res_ways = pgsql_execPrepared(way_table->sql_conn, "pending_ways", 0, NULL, PGRES_TUPLES_OK);
771
772    fprintf(stderr, "\t%i ways are pending\n", PQntuples(res_ways));
773
774
775    /**
776     * To speed up processing of pending ways, fork noProcs worker processes
777     * each of which independently goes through an equal subset of the pending ways array
778     */
779    fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
780    for (p = 1; p < noProcs; p++) {
781        pid=fork();
782        if (pid==0) break;
783        if (pid==-1) {
784            fprintf(stderr,"WARNING: Failed to fork helper processes. Falling back to only using %i \n", p);
785            noProcs = p;
786            break;
787        }
788    }
789    if ((pid == 0) && (noProcs > 1)) {
790        /* After forking, need to reconnect to the postgresql db */
791        out_options->out->connect(out_options, 1);
792        pgsql_connect(out_options);
793    } else {
794        p = 0;
795    }
796
797    if (out_options->flat_node_cache_enabled) init_node_persistent_cache(out_options,1,(noProcs > 1?0:1)); //at this point we always want to be in append mode, to not delete and recreate the node cache file
798
799    /* Only start an extended transaction on the ways table,
800     * which should cover the bulk of the update statements.
801     * The nodes table should not be written to in this phase.
802     * The relations table can't be wrapped in an extended
803     * transaction, as with prallel processing it may deadlock.
804     * Updating a way will trigger an update of the pending status
805     * on connected relations. This should not be as many updates,
806     * so in combination with the synchronous_comit = off it should be fine.
807     *
808     */
809    if (tables[t_way].start) {
810        pgsql_endCopy(&tables[t_way]);
811        pgsql_exec(tables[t_way].sql_conn, PGRES_COMMAND_OK, "%s", tables[t_way].start);
812        tables[t_way].transactionMode = 1;
813    }
814
815    //fprintf(stderr, "\nIterating ways\n");
816    /* Use a stride length of the number of worker processes,
817       starting with an offset for each worker process p */
818    for (i = p; i < PQntuples(res_ways); i+= noProcs) {
819        osmid_t id = strtoosmid(PQgetvalue(res_ways, i, 0), NULL, 10);
820        struct keyval tags;
821        struct osmNode *nodes;
822        int nd_count;
823
824        if (count++ %1000 == 0) {
825            time(&end);
826#if HAVE_MMAP
827            if(info)
828            {
829                double rate = 0;
830                int n, total = 0, finished = 0;
831                struct progress_info f;
832
833                f.start = start;
834                f.end = end;
835                f.count = count;
836                f.finished = 0;
837                info[p] = f;
838                for(n = 0; n < noProcs; ++n)
839                {
840                    f = info[n];
841                    total += f.count;
842                    finished += f.finished;
843                    if(f.end > f.start)
844                        rate += (double)f.count / (double)(f.end - f.start);
845                }
846                fprintf(stderr, "\rprocessing way (%dk) at %.2fk/s (done %d of %d)", total/1000, rate/1000.0, finished, noProcs);
847            }
848            else
849#endif
850            {
851                fprintf(stderr, "\rprocessing way (%dk) at %.2fk/s", count/1000,
852                end > start ? ((double)count / 1000.0 / (double)(end - start)) : 0);
853            }
854        }
855
856        initList(&tags);
857        if( pgsql_ways_get(id, &tags, &nodes, &nd_count) )
858          continue;
859
860        callback(id, &tags, nodes, nd_count, exists);
861        pgsql_ways_done( id );
862
863        free(nodes);
864        resetList(&tags);
865    }
866
867    if (tables[t_way].stop && tables[t_way].transactionMode) {
868        pgsql_exec(tables[t_way].sql_conn, PGRES_COMMAND_OK, "%s", tables[t_way].stop);
869        tables[t_way].transactionMode = 0;
870    }
871
872    time(&end);
873#if HAVE_MMAP
874    if(info)
875    {
876        struct progress_info f;
877        f.start = start;
878        f.end = end;
879        f.count = count;
880        f.finished = 1;
881        info[p] = f;
882    }
883#endif
884    fprintf(stderr, "\rProcess %i finished processing %i ways in %i sec\n", p, count, (int)(end - start));
885
886    if ((pid == 0) && (noProcs > 1)) {
887        pgsql_cleanup();
888        out_options->out->close(1);
889        if (out_options->flat_node_cache_enabled) shutdown_node_persistent_cache();
890        exit(0);
891    } else {
892        for (p = 0; p < noProcs; p++) wait(NULL);
893        fprintf(stderr, "\nAll child processes exited\n");
894    }
895
896#if HAVE_MMAP
897    munmap(info, sizeof(struct progress_info)*noProcs);
898#endif
899
900    fprintf(stderr, "\n");
901    time(&end);
902    if (end - start > 0)
903        fprintf(stderr, "%i Pending ways took %ds at a rate of %.2f/s\n",PQntuples(res_ways), (int)(end - start),
904                ((double)PQntuples(res_ways) / (double)(end - start)));
905    PQclear(res_ways);
906}
907
908static int pgsql_way_changed(osmid_t osm_id)
909{
910    char const *paramValues[1];
911    char buffer[64];
912    /* Make sure we're out of copy mode */
913    pgsql_endCopy( rel_table );
914
915    sprintf( buffer, "%" PRIdOSMID, osm_id );
916    paramValues[0] = buffer;
917    pgsql_execPrepared(rel_table->sql_conn, "way_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
918    return 0;
919}
920
921static int pgsql_rels_set(osmid_t id, struct member *members, int member_count, struct keyval *tags)
922{
923    /* Params: id, way_off, rel_off, parts, members, tags */
924    char *paramValues[6];
925    char *buffer;
926    int i;
927    struct keyval member_list;
928
929    osmid_t node_parts[member_count],
930            way_parts[member_count],
931            rel_parts[member_count];
932    int node_count = 0, way_count = 0, rel_count = 0;
933
934    osmid_t all_parts[member_count];
935    int all_count = 0;
936    initList( &member_list );
937    for( i=0; i<member_count; i++ )
938    {
939      char tag = 0;
940      switch( members[i].type )
941      {
942        case OSMTYPE_NODE:     node_parts[node_count++] = members[i].id; tag = 'n'; break;
943        case OSMTYPE_WAY:      way_parts[way_count++] = members[i].id; tag = 'w'; break;
944        case OSMTYPE_RELATION: rel_parts[rel_count++] = members[i].id; tag = 'r'; break;
945        default: fprintf( stderr, "Internal error: Unknown member type %d\n", members[i].type ); exit_nicely();
946      }
947      char buf[64];
948      sprintf( buf, "%c%" PRIdOSMID, tag, members[i].id );
949      addItem( &member_list, buf, members[i].role, 0 );
950    }
951    memcpy( all_parts+all_count, node_parts, node_count*sizeof(osmid_t) ); all_count+=node_count;
952    memcpy( all_parts+all_count, way_parts, way_count*sizeof(osmid_t) ); all_count+=way_count;
953    memcpy( all_parts+all_count, rel_parts, rel_count*sizeof(osmid_t) ); all_count+=rel_count;
954
955    if( rel_table->copyMode )
956    {
957      char *tag_buf = strdup(pgsql_store_tags(tags,1));
958      char *member_buf = pgsql_store_tags(&member_list,1);
959      char *parts_buf = pgsql_store_nodes(all_parts, all_count);
960      int length = strlen(member_buf) + strlen(tag_buf) + strlen(parts_buf) + 64;
961      buffer = alloca(length);
962      if( snprintf( buffer, length, "%" PRIdOSMID "\t%d\t%d\t%s\t%s\t%s\tf\n",
963              id, node_count, node_count+way_count, parts_buf, member_buf, tag_buf ) > (length-10) )
964      { fprintf( stderr, "buffer overflow relation id %" PRIdOSMID "\n", id); return 1; }
965      free(tag_buf);
966      resetList(&member_list);
967      return pgsql_CopyData(__FUNCTION__, rel_table->sql_conn, buffer);
968    }
969    buffer = alloca(64);
970    paramValues[0] = buffer;
971    paramValues[1] = paramValues[0] + sprintf( paramValues[0], "%" PRIdOSMID, id ) + 1;
972    paramValues[2] = paramValues[1] + sprintf( paramValues[1], "%d", node_count ) + 1;
973    sprintf( paramValues[2], "%d", node_count+way_count );
974    paramValues[3] = pgsql_store_nodes(all_parts, all_count);
975    paramValues[4] = pgsql_store_tags(&member_list,0);
976    if( paramValues[4] )
977        paramValues[4] = strdup(paramValues[4]);
978    paramValues[5] = pgsql_store_tags(tags,0);
979    pgsql_execPrepared(rel_table->sql_conn, "insert_rel", 6, (const char * const *)paramValues, PGRES_COMMAND_OK);
980    if( paramValues[4] )
981        free(paramValues[4]);
982    resetList(&member_list);
983    return 0;
984}
985
986/* Caller is responsible for freeing members & resetList(tags) */
987static int pgsql_rels_get(osmid_t id, struct member **members, int *member_count, struct keyval *tags)
988{
989    PGresult   *res;
990    char tmp[16];
991    char const *paramValues[1];
992    PGconn *sql_conn = rel_table->sql_conn;
993    struct keyval member_temp;
994
995    /* Make sure we're out of copy mode */
996    pgsql_endCopy( rel_table );
997
998    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
999    paramValues[0] = tmp;
1000
1001    res = pgsql_execPrepared(sql_conn, "get_rel", 1, paramValues, PGRES_TUPLES_OK);
1002    /* Fields are: members, tags, member_count */
1003
1004    if (PQntuples(res) != 1) {
1005        PQclear(res);
1006        return 1;
1007    }
1008
1009    pgsql_parse_tags( PQgetvalue(res, 0, 1), tags );
1010    initList(&member_temp);
1011    pgsql_parse_tags( PQgetvalue(res, 0, 0), &member_temp );
1012
1013    int num_members = strtol(PQgetvalue(res, 0, 2), NULL, 10);
1014    struct member *list = malloc( sizeof(struct member)*num_members );
1015
1016    int i=0;
1017    struct keyval *item;
1018    while( (item = popItem(&member_temp)) )
1019    {
1020        if( i >= num_members )
1021        {
1022            fprintf(stderr, "Unexpected member_count reading relation %" PRIdOSMID "\n", id);
1023            exit_nicely();
1024        }
1025        char tag = item->key[0];
1026        list[i].type = (tag == 'n')?OSMTYPE_NODE:(tag == 'w')?OSMTYPE_WAY:(tag == 'r')?OSMTYPE_RELATION:-1;
1027        list[i].id = strtoosmid(item->key+1, NULL, 10 );
1028        list[i].role = strdup( item->value );
1029        freeItem(item);
1030        i++;
1031    }
1032    *members = list;
1033    *member_count = num_members;
1034    PQclear(res);
1035    return 0;
1036}
1037
1038static int pgsql_rels_done(osmid_t id)
1039{
1040    char tmp[16];
1041    char const *paramValues[1];
1042    PGconn *sql_conn = rel_table->sql_conn;
1043
1044    /* Make sure we're out of copy mode */
1045    pgsql_endCopy( rel_table );
1046
1047    snprintf(tmp, sizeof(tmp), "%" PRIdOSMID, id);
1048    paramValues[0] = tmp;
1049
1050    pgsql_execPrepared(sql_conn, "rel_done", 1, paramValues, PGRES_COMMAND_OK);
1051
1052    return 0;
1053}
1054
1055static int pgsql_rels_delete(osmid_t osm_id)
1056{
1057    char const *paramValues[1];
1058    char buffer[64];
1059    /* Make sure we're out of copy mode */
1060    pgsql_endCopy( rel_table );
1061
1062    sprintf( buffer, "%" PRIdOSMID, osm_id );
1063    paramValues[0] = buffer;
1064    pgsql_execPrepared(rel_table->sql_conn, "delete_rel", 1, paramValues, PGRES_COMMAND_OK );
1065    return 0;
1066}
1067
1068static void pgsql_iterate_relations(int (*callback)(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists))
1069{
1070    PGresult   *res_rels;
1071    int noProcs = out_options->num_procs;
1072    int pid;
1073    int i, p, count = 0;
1074    /* The flag we pass to indicate that the way in question might exist already in the database */
1075    int exists = Append;
1076
1077    time_t start, end;
1078    time(&start);
1079#if HAVE_MMAP
1080    struct progress_info *info = 0;
1081    if(noProcs > 1)
1082        info = mmap(0, sizeof(struct progress_info)*noProcs, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
1083#endif
1084    fprintf(stderr, "\nGoing over pending relations...\n");
1085
1086    /* Make sure we're out of copy mode */
1087    pgsql_endCopy( rel_table );
1088
1089    if (out_options->flat_node_cache_enabled) shutdown_node_persistent_cache();
1090
1091    res_rels = pgsql_execPrepared(rel_table->sql_conn, "pending_rels", 0, NULL, PGRES_TUPLES_OK);
1092
1093    fprintf(stderr, "\t%i relations are pending\n", PQntuples(res_rels));
1094
1095    fprintf(stderr, "\nUsing %i helper-processes\n", noProcs);
1096    for (p = 1; p < noProcs; p++) {
1097        pid=fork();
1098        if (pid==0) break;
1099        if (pid==-1) {
1100            fprintf(stderr,"WARNING: Failed to fork helper processes. Falling back to only using %i \n", p);
1101            noProcs = p;
1102            break;
1103        }
1104    }
1105    if ((pid == 0) && (noProcs > 1)) {
1106        out_options->out->connect(out_options, 0);
1107        pgsql_connect(out_options);
1108    } else {
1109        p = 0;
1110    }
1111
1112    if (out_options->flat_node_cache_enabled) init_node_persistent_cache(out_options, 1, (noProcs > 1?0:1)); //at this point we always want to be in append mode, to not delete and recreate the node cache file
1113
1114    //fprintf(stderr, "\nIterating ways\n");
1115    for (i = p; i < PQntuples(res_rels); i+= noProcs) {
1116        osmid_t id = strtoosmid(PQgetvalue(res_rels, i, 0), NULL, 10);
1117        struct keyval tags;
1118        struct member *members;
1119        int member_count;
1120
1121        if (count++ %10 == 0) {
1122            time(&end);
1123#if HAVE_MMAP
1124            if(info)
1125            {
1126                double rate = 0;
1127                int n, total = 0, finished = 0;
1128                struct progress_info f;
1129
1130                f.start = start;
1131                f.end = end;
1132                f.count = count;
1133                f.finished = 0;
1134                info[p] = f;
1135                for(n = 0; n < noProcs; ++n)
1136                {
1137                    f = info[n];
1138                    total += f.count;
1139                    finished += f.finished;
1140                    if(f.end > f.start)
1141                        rate += (double)f.count / (double)(f.end - f.start);
1142                }
1143                fprintf(stderr, "\rprocessing relation (%d) at %.2f/s (done %d of %d)", total, rate, finished, noProcs);
1144            }
1145            else
1146#endif
1147            {
1148                fprintf(stderr, "\rprocessing relation (%d) at %.2f/s", count,
1149                        end > start ? ((double)count / (double)(end - start)) : 0);
1150            }
1151        }
1152
1153        initList(&tags);
1154        if( pgsql_rels_get(id, &members, &member_count, &tags) )
1155          continue;
1156
1157        callback(id, members, member_count, &tags, exists);
1158        pgsql_rels_done( id );
1159
1160        free(members);
1161        resetList(&tags);
1162    }
1163    time(&end);
1164#if HAVE_MMAP
1165    if(info)
1166    {
1167        struct progress_info f;
1168        f.start = start;
1169        f.end = end;
1170        f.count = count;
1171        f.finished = 1;
1172        info[p] = f;
1173    }
1174#endif
1175    fprintf(stderr, "\rProcess %i finished processing %i relations in %i sec\n", p, count, (int)(end - start));
1176
1177    if ((pid == 0) && (noProcs > 1)) {
1178        pgsql_cleanup();
1179        out_options->out->close(0);
1180        if (out_options->flat_node_cache_enabled) shutdown_node_persistent_cache();
1181        exit(0);
1182    } else {
1183        for (p = 0; p < noProcs; p++) wait(NULL);
1184        fprintf(stderr, "\nAll child processes exited\n");
1185    }
1186
1187#if HAVE_MMAP
1188    munmap(info, sizeof(struct progress_info)*noProcs);
1189#endif
1190    time(&end);
1191    if (end - start > 0)
1192        fprintf(stderr, "%i Pending relations took %ds at a rate of %.2f/s\n",PQntuples(res_rels), (int)(end - start), ((double)PQntuples(res_rels) / (double)(end - start)));
1193    PQclear(res_rels);
1194    fprintf(stderr, "\n");
1195
1196}
1197
1198static int pgsql_rel_changed(osmid_t osm_id)
1199{
1200    char const *paramValues[1];
1201    char buffer[64];
1202    /* Make sure we're out of copy mode */
1203    pgsql_endCopy( rel_table );
1204
1205    sprintf( buffer, "%" PRIdOSMID, osm_id );
1206    paramValues[0] = buffer;
1207    pgsql_execPrepared(rel_table->sql_conn, "rel_changed_mark", 1, paramValues, PGRES_COMMAND_OK );
1208    return 0;
1209}
1210
1211static void pgsql_analyze(void)
1212{
1213    int i;
1214
1215    for (i=0; i<num_tables; i++) {
1216        PGconn *sql_conn = tables[i].sql_conn;
1217
1218        if (tables[i].analyze) {
1219            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].analyze);
1220        }
1221    }
1222}
1223
1224static void pgsql_end(void)
1225{
1226    int i;
1227
1228    for (i=0; i<num_tables; i++) {
1229        PGconn *sql_conn = tables[i].sql_conn;
1230
1231        // Commit transaction
1232        if (tables[i].stop && tables[i].transactionMode) {
1233            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
1234            tables[i].transactionMode = 0;
1235        }
1236
1237    }
1238}
1239
1240/**
1241 * Helper to create SQL queries.
1242 *
1243 * The input string is mangled as follows:
1244 * %p replaced by the content of the "prefix" option
1245 * %i replaced by the content of the "tblsslim_data" option
1246 * %t replaced by the content of the "tblssslim_index" option
1247 * %m replaced by "UNLOGGED" if the "unlogged" option is set
1248 * other occurrences of the "%" char are treated normally.
1249 * any occurrence of { or } will be ignored (not copied to output string);
1250 * anything inside {} is only copied if it contained at least one of
1251 * %p, %i, %t, %m that was not NULL.
1252 *
1253 * So, the input string
1254 *    Hello{ dear %i}!
1255 * will, if i is set to "John", translate to
1256 *    Hello dear John!
1257 * but if i is unset, translate to
1258 *    Hello!
1259 *
1260 * This is used for constructing SQL queries with proper tablespace settings.
1261 */
1262static inline void set_prefix_and_tbls(const struct output_options *options, const char **string)
1263{
1264    char buffer[1024];
1265    if (*string == NULL) return;
1266    const char *source = *string;
1267    char *dest = buffer;
1268    char *openbrace = NULL;
1269    int copied = 0;
1270
1271    while (*source) {
1272        if (*source == '{') {
1273            openbrace = dest;
1274            copied = 0;
1275            source++;
1276            continue;
1277        } else if (*source == '}') {
1278            if (!copied && openbrace) dest = openbrace;
1279            source++;
1280            continue;
1281        } else if (*source == '%') {
1282            if (*(source+1) == 'p') {
1283                if (options->prefix) {
1284                    strcpy(dest, options->prefix);
1285                    dest += strlen(options->prefix);
1286                    copied = 1;
1287                }
1288                source+=2;
1289                continue;
1290            } else if (*(source+1) == 't') {
1291                if (options->tblsslim_data) {
1292                    strcpy(dest, options->tblsslim_data);
1293                    dest += strlen(options->tblsslim_data);
1294                    copied = 1;
1295                }
1296                source+=2;
1297                continue;
1298            } else if (*(source+1) == 'i') {
1299                if (options->tblsslim_index) {
1300                    strcpy(dest, options->tblsslim_index);
1301                    dest += strlen(options->tblsslim_index);
1302                    copied = 1;
1303                }
1304                source+=2;
1305                continue;
1306            } else if (*(source+1) == 'm') {
1307                if (options->unlogged) {
1308                    strcpy(dest, "UNLOGGED");
1309                    dest += 8;
1310                    copied = 1;
1311                }
1312                source+=2;
1313                continue;
1314            }
1315        }
1316        *(dest++) = *(source++);
1317    }
1318    *dest = 0;
1319    *string = strdup(buffer);
1320}
1321
1322static int build_indexes;
1323
1324
1325static int pgsql_start(const struct output_options *options)
1326{
1327    PGresult   *res;
1328    int i;
1329    int dropcreate = !options->append;
1330    char * sql;
1331
1332    scale = options->scale;
1333    Append = options->append;
1334
1335    out_options = options;
1336
1337    init_node_ram_cache( options->alloc_chunkwise | ALLOC_LOSSY, options->cache, scale);
1338    if (options->flat_node_cache_enabled) init_node_persistent_cache(options, options->append, 1);
1339
1340    fprintf(stderr, "Mid: pgsql, scale=%d cache=%d\n", scale, options->cache);
1341
1342    /* We use a connection per table to enable the use of COPY */
1343    for (i=0; i<num_tables; i++) {
1344        PGconn *sql_conn;
1345
1346        set_prefix_and_tbls(options, &(tables[i].name));
1347        set_prefix_and_tbls(options, &(tables[i].start));
1348        set_prefix_and_tbls(options, &(tables[i].create));
1349        set_prefix_and_tbls(options, &(tables[i].create_index));
1350        set_prefix_and_tbls(options, &(tables[i].prepare));
1351        set_prefix_and_tbls(options, &(tables[i].prepare_intarray));
1352        set_prefix_and_tbls(options, &(tables[i].copy));
1353        set_prefix_and_tbls(options, &(tables[i].analyze));
1354        set_prefix_and_tbls(options, &(tables[i].stop));
1355        set_prefix_and_tbls(options, &(tables[i].array_indexes));
1356
1357        fprintf(stderr, "Setting up table: %s\n", tables[i].name);
1358        sql_conn = PQconnectdb(options->conninfo);
1359
1360        /* Check to see that the backend connection was successfully made */
1361        if (PQstatus(sql_conn) != CONNECTION_OK) {
1362            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
1363            exit_nicely();
1364        }
1365        tables[i].sql_conn = sql_conn;
1366
1367        /*
1368         * To allow for parallelisation, the second phase (iterate_ways), cannot be run
1369         * in an extended transaction and each update statement is its own transaction.
1370         * Therefore commit rate of postgresql is very important to ensure high speed.
1371         * If fsync is enabled to ensure safe transactions, the commit rate can be very low.
1372         * To compensate for this, one can set the postgresql parameter synchronous_commit
1373         * to off. This means an update statement returns to the client as success before the
1374         * transaction is saved to disk via fsync, which in return allows to bunch up multiple
1375         * transactions into a single fsync. This may result in some data loss in the case of a
1376         * database crash. However, as we don't currently have the ability to restart a full osm2pgsql
1377         * import session anyway, this is fine. Diff imports are also not effected, as the next
1378         * diff import would simply deal with all pending ways that were not previously finished.
1379         * This parameter does not effect safety from data corruption on the back-end.
1380         */
1381        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
1382
1383        /* Not really the right place for this test, but we need a live
1384         * connection that not used for anything else yet, and we'd like to
1385         * warn users *before* we start doing mountains of work */
1386        if (i == t_node)
1387        {
1388            res = PQexec(sql_conn, "select 1 from pg_opclass where opcname='gist__intbig_ops'" );
1389            if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1390            {
1391                /* intarray is problematic now; causes at least postgres 8.4
1392                 * to not use the index on nodes[]/parts[] which slows diff
1393                 * updates to a crawl!
1394                 * If someone find a way to fix this rather than bow out here,
1395                 * please do.*/
1396
1397                fprintf(stderr,
1398                    "\n"
1399                    "The target database has the intarray contrib module loaded.\n"
1400                    "While required for earlier versions of osm2pgsql, intarray \n"
1401                    "is now unnecessary and will interfere with osm2pgsql's array\n"
1402                    "handling. Please use a database without intarray.\n\n");
1403                exit_nicely();
1404            }
1405            PQclear(res);
1406
1407            if (options->append)
1408            {
1409                sql = malloc (2048);
1410                snprintf(sql, 2047, "SELECT id FROM %s LIMIT 1", tables[t_node].name);
1411                res = PQexec(sql_conn, sql );
1412                free(sql);
1413                sql = NULL;
1414                if(PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
1415                {
1416                    int size = PQfsize(res, 0);
1417                    if (size != sizeof(osmid_t))
1418                    {
1419                        fprintf(stderr,
1420                            "\n"
1421                            "The target database has been created with %dbit ID fields,\n"
1422                            "but this version of osm2pgsql has been compiled to use %ldbit IDs.\n"
1423                            "You cannot append data to this database with this program.\n"
1424                            "Either re-create the database or use a matching osm2pgsql.\n\n",
1425                            size * 8, sizeof(osmid_t) * 8);
1426                        exit_nicely();
1427                    }
1428                }
1429                PQclear(res);
1430            }
1431
1432            if(!options->append)
1433                build_indexes = 1;
1434        }
1435        if (dropcreate) {
1436            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s", tables[i].name);
1437        }
1438
1439        if (tables[i].start) {
1440            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].start);
1441            tables[i].transactionMode = 1;
1442        }
1443
1444        if (dropcreate && tables[i].create) {
1445            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create);
1446            if (tables[i].create_index) {
1447              pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].create_index);
1448            }
1449        }
1450
1451
1452        if (tables[i].prepare) {
1453            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare);
1454        }
1455
1456        if (tables[i].prepare_intarray) {
1457            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].prepare_intarray);
1458        }
1459
1460        if (tables[i].copy) {
1461            pgsql_exec(sql_conn, PGRES_COPY_IN, "%s", tables[i].copy);
1462            tables[i].copyMode = 1;
1463        }
1464    }
1465
1466    return 0;
1467}
1468
1469static void pgsql_commit(void) {
1470    int i;
1471    for (i=0; i<num_tables; i++) {
1472        PGconn *sql_conn = tables[i].sql_conn;
1473        pgsql_endCopy(&tables[i]);
1474        if (tables[i].stop && tables[i].transactionMode) {
1475            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", tables[i].stop);
1476            tables[i].transactionMode = 0;
1477        }
1478    }
1479}
1480
1481static void *pgsql_stop_one(void *arg)
1482{
1483    time_t start, end;
1484
1485    struct table_desc *table = arg;
1486    PGconn *sql_conn = table->sql_conn;
1487
1488    fprintf(stderr, "Stopping table: %s\n", table->name);
1489    pgsql_endCopy(table);
1490    //if (table->stop)
1491    //    pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->stop);
1492    time(&start);
1493    if (!out_options->droptemp)
1494    {
1495        if (build_indexes && table->array_indexes) {
1496            char *buffer = (char *) malloc(strlen(table->array_indexes) + 99);
1497            // we need to insert before the TABLESPACE setting, if any
1498            char *insertpos = strstr(table->array_indexes, "TABLESPACE");
1499            if (!insertpos) insertpos = strchr(table->array_indexes, ';');
1500
1501            // automatically insert FASTUPDATE=OFF when creating,
1502            // indexes for PostgreSQL 8.4 and higher
1503            // see http://lists.openstreetmap.org/pipermail/dev/2011-January/021704.html
1504            if (insertpos && PQserverVersion(sql_conn) >= 80400) {
1505                char old = *insertpos;
1506                fprintf(stderr, "Building index on table: %s (fastupdate=off)\n", table->name);
1507                *insertpos = 0; // temporary null byte for following strcpy operation
1508                strcpy(buffer, table->array_indexes);
1509                *insertpos = old; // restore old content
1510                strcat(buffer, " WITH (FASTUPDATE=OFF)");
1511                strcat(buffer, insertpos);
1512            } else {
1513                fprintf(stderr, "Building index on table: %s\n", table->name);
1514                strcpy(buffer, table->array_indexes);
1515            }
1516            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", buffer);
1517            free(buffer);
1518        }
1519    }
1520    else
1521    {
1522        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "drop table %s", table->name);
1523    }
1524    PQfinish(sql_conn);
1525    table->sql_conn = NULL;
1526    time(&end);
1527    fprintf(stderr, "Stopped table: %s in %is\n", table->name, (int)(end - start));
1528    return NULL;
1529}
1530
1531static void pgsql_stop(void)
1532{
1533    int i;
1534#ifdef HAVE_PTHREAD
1535    pthread_t threads[num_tables];
1536#endif
1537
1538    free_node_ram_cache();
1539    if (out_options->flat_node_cache_enabled) shutdown_node_persistent_cache();
1540
1541#ifdef HAVE_PTHREAD
1542    for (i=0; i<num_tables; i++) {
1543        int ret = pthread_create(&threads[i], NULL, pgsql_stop_one, &tables[i]);
1544        if (ret) {
1545            fprintf(stderr, "pthread_create() returned an error (%d)", ret);
1546            exit_nicely();
1547        }
1548    }
1549    for (i=0; i<num_tables; i++) {
1550        int ret = pthread_join(threads[i], NULL);
1551        if (ret) {
1552            fprintf(stderr, "pthread_join() returned an error (%d)", ret);
1553            exit_nicely();
1554        }
1555    }
1556#else
1557    for (i=0; i<num_tables; i++)
1558        pgsql_stop_one(&tables[i]);
1559#endif
1560}
1561
1562struct middle_t mid_pgsql = {
1563        .start             = pgsql_start,
1564        .stop              = pgsql_stop,
1565        .cleanup           = pgsql_cleanup,
1566        .analyze           = pgsql_analyze,
1567        .end               = pgsql_end,
1568        .commit            = pgsql_commit,
1569
1570        .nodes_set         = middle_nodes_set,
1571#if 0
1572        .nodes_get         = middle_nodes_get,
1573#endif
1574        .nodes_get_list    = middle_nodes_get_list,
1575        .nodes_delete      = middle_nodes_delete,
1576        .node_changed      = pgsql_node_changed,
1577
1578        .ways_set          = pgsql_ways_set,
1579        .ways_get          = pgsql_ways_get,
1580        .ways_done         = pgsql_ways_done,
1581        .ways_delete       = pgsql_ways_delete,
1582        .way_changed       = pgsql_way_changed,
1583
1584        .relations_set     = pgsql_rels_set,
1585#if 0
1586        .relations_get     = pgsql_rels_get,
1587#endif
1588        .relations_done    = pgsql_rels_done,
1589        .relations_delete  = pgsql_rels_delete,
1590        .relation_changed  = pgsql_rel_changed,
1591#if 0
1592        .iterate_nodes     = pgsql_iterate_nodes,
1593#endif
1594        .iterate_ways      = pgsql_iterate_ways,
1595        .iterate_relations = pgsql_iterate_relations
1596};