diff --git a/lib/bgpstream_reader.c b/lib/bgpstream_reader.c index e46dcc23..0bc3012b 100644 --- a/lib/bgpstream_reader.c +++ b/lib/bgpstream_reader.c @@ -93,6 +93,11 @@ static int prefetch_record(bgpstream_reader_t *reader) // try and get the next entry from the resource (will do filtering) reader->status = bgpstream_format_populate_record(reader->format, record); + // exit with error in case of read error + if (reader->status == BGPSTREAM_FORMAT_READ_ERROR) { + return -1; + } + // if we got any of the non-error END_OF_DUMP messages but this is a stream // resource, then pretend we're ok. but beware that now we'll be "OK", with // an unfilled prefetch record diff --git a/lib/bgpstream_resource_mgr.c b/lib/bgpstream_resource_mgr.c index 86e0e025..ff09a2ca 100644 --- a/lib/bgpstream_resource_mgr.c +++ b/lib/bgpstream_resource_mgr.c @@ -53,9 +53,9 @@ struct res_list_elem { /** Is the reader open? (i.e. have we waited for it to open) */ int open; - /** Time when this resource should next be polled (if 0 then poll + /** Time in ms when this resource should next be polled (if 0 then poll immediately) */ - uint32_t next_poll; + uint64_t next_poll; /** Previous list elem */ struct res_list_elem *prev; @@ -651,7 +651,7 @@ static bgpstream_reader_status_t pop_record(bgpstream_resource_mgr_t *q, bgpstream_reader_status_t rs; struct res_list_elem *el = NULL; struct res_group *gp = NULL; - uint32_t now; + uint64_t now; uint64_t sleep_nsec; struct timespec rqtp; diff --git a/lib/formats/bs_format_rislive.c b/lib/formats/bs_format_rislive.c index 18056918..621c5042 100644 --- a/lib/formats/bs_format_rislive.c +++ b/lib/formats/bs_format_rislive.c @@ -584,7 +584,7 @@ bs_format_rislive_populate_record(bgpstream_format_t *format, return BGPSTREAM_FORMAT_CORRUPTED_DUMP; } else if (STATE->json_string_buffer_len == 0) { // end of dump - return BGPSTREAM_FORMAT_END_OF_DUMP; + return BGPSTREAM_FORMAT_READ_ERROR; } if ((rc = bs_format_process_json_fields(format, record)) != 0) {