bes Updated for version 3.20.10
DmrppArray.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2016 OPeNDAP, Inc.
6// Author: James Gallagher <jgallagher@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <string>
27#include <sstream>
28#include <vector>
29#include <memory>
30#include <queue>
31#include <iterator>
32#include <thread>
33#include <future> // std::async, std::future
34#include <chrono> // std::chrono::milliseconds
35
36#include <cstring>
37#include <cassert>
38#include <cerrno>
39
40#include <pthread.h>
41#include <cmath>
42
43#include <unistd.h>
44
45#include <libdap/D4Enum.h>
46#include <libdap/D4Attributes.h>
47#include <libdap/D4Maps.h>
48#include <libdap/D4Group.h>
49
50#include "BESInternalError.h"
51#include "BESDebug.h"
52#include "BESLog.h"
53#include "BESStopWatch.h"
54
55#include "byteswap_compat.h"
56#include "CurlHandlePool.h"
57#include "Chunk.h"
58#include "DmrppArray.h"
59#include "DmrppRequestHandler.h"
60#include "DmrppNames.h"
61#include "Base64.h"
62
63// Used with BESDEBUG
64#define dmrpp_3 "dmrpp:3"
65#define dmrpp_4 "dmrpp:4"
66
67using namespace libdap;
68using namespace std;
69
70#define MB (1024*1024)
71#define prolog std::string("DmrppArray::").append(__func__).append("() - ")
72
73namespace dmrpp {
74
75
76// Transfer Thread Pool state variables.
77std::mutex transfer_thread_pool_mtx; // mutex for critical section
78atomic_uint transfer_thread_counter(0);
79
80
81
96bool get_next_future(list<std::future<bool>> &futures, atomic_uint &thread_counter, unsigned long timeout, string debug_prefix) {
97 bool future_finished = false;
98 bool done = false;
99 std::chrono::milliseconds timeout_ms (timeout);
100
101 while(!done){
102 auto futr = futures.begin();
103 auto fend = futures.end();
104 bool future_is_valid = true;
105 while(!future_finished && future_is_valid && futr != fend){
106 future_is_valid = (*futr).valid();
107 if(future_is_valid){
108 // What happens if wait_for() always returns future_status::timeout for a stuck thread?
109 // If that were to happen, the loop would run forever. However, we assume that these
110 // threads are never 'stuck.' We assume that their computations always complete, either
111 // with success or failure. For the transfer threads, timeouts will stop them if nothing
112 // else does and for the decompression threads, the worst case is a segmentation fault.
113 // jhrg 2/5/21
114 if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
115 try {
116 bool success = (*futr).get();
117 future_finished = true;
118 BESDEBUG(dmrpp_3, debug_prefix << prolog << "Called future::get() on a ready future."
119 << " success: " << (success?"true":"false") << endl);
120 if(!success){
121 stringstream msg;
122 msg << debug_prefix << prolog << "The std::future has failed!";
123 msg << " thread_counter: " << thread_counter;
124 throw BESInternalError(msg.str(), __FILE__, __LINE__);
125 }
126 }
127 catch(...){
128 // TODO I had to add this to make the thread counting work when there's errors
129 // But I think it's primitive because it trashes everything - there's
130 // surely a way to handle the situation on a per thread basis and maybe even
131 // retry?
132 futures.clear();
133 thread_counter=0;
134 throw;
135 }
136 }
137 else {
138 futr++;
139 BESDEBUG(dmrpp_3, debug_prefix << prolog << "future::wait_for() timed out. (timeout: " <<
140 timeout << " ms) There are currently " << futures.size() << " futures in process."
141 << " thread_counter: " << thread_counter << endl);
142 }
143 }
144 else {
145 BESDEBUG(dmrpp_3, debug_prefix << prolog << "The future was not valid. Dumping... " << endl);
146 future_finished = true;
147 }
148 }
149
150 if (futr!=fend && future_finished) {
151 futures.erase(futr);
152 thread_counter--;
153 BESDEBUG(dmrpp_3, debug_prefix << prolog << "Erased future from futures list. (Erased future was "
154 << (future_is_valid?"":"not ") << "valid at start.) There are currently " <<
155 futures.size() << " futures in process. thread_counter: " << thread_counter << endl);
156 }
157
158 done = future_finished || futures.empty();
159 }
160
161 return future_finished;
162}
163
164
165
176bool one_child_chunk_thread_new(unique_ptr<one_child_chunk_args_new> args)
177{
178
179 args->child_chunk->read_chunk();
180
181 assert(args->the_one_chunk->get_rbuf());
182 assert(args->child_chunk->get_rbuf());
183 assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
184
185 // the_one_chunk offset \/
186 // the_one_chunk: mmmmmmmmmmmmmmmm
187 // child chunks: 1111222233334444 (there are four child chunks)
188 // child offsets: ^ ^ ^ ^
189 // For this example, child_1_offset - the_one_chunk_offset == 0 (that's always true)
190 // child_2_offset - the_one_chunk_offset == 4; child_2_offset - the_one_chunk_offset == 8
191 // and child_3_offset - the_one_chunk_offset == 12.
192 // Those are the starting locations with in the data buffer of the the_one_chunk
193 // where that child chunk should be written.
194 // Note: all the offset values start at the beginning of the file.
195
196 unsigned long long offset_within_the_one_chunk = args->child_chunk->get_offset() - args->the_one_chunk->get_offset();
197
198 memcpy(args->the_one_chunk->get_rbuf() + offset_within_the_one_chunk, args->child_chunk->get_rbuf(),
199 args->child_chunk->get_bytes_read());
200
201 return true;
202}
203
204
205
211bool one_super_chunk_transfer_thread(unique_ptr<one_super_chunk_args> args)
212{
213
214#if DMRPP_ENABLE_THREAD_TIMERS
215 stringstream timer_tag;
216 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
217 " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
218 BESStopWatch sw(TRANSFER_THREADS);
219 sw.start(timer_tag.str());
220#endif
221
222 args->super_chunk->read();
223 return true;
224}
225
231bool one_super_chunk_unconstrained_transfer_thread(unique_ptr<one_super_chunk_args> args)
232{
233
234#if DMRPP_ENABLE_THREAD_TIMERS
235 stringstream timer_tag;
236 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
237 " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
238 BESStopWatch sw(TRANSFER_THREADS);
239 sw.start(timer_tag.str());
240#endif
241
242 args->super_chunk->read_unconstrained();
243 return true;
244}
245
246
247bool start_one_child_chunk_thread(list<std::future<bool>> &futures, unique_ptr<one_child_chunk_args_new> args) {
248 bool retval = false;
249 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
250 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
251 transfer_thread_counter++;
252 futures.push_back( std::async(std::launch::async, one_child_chunk_thread_new, std::move(args)));
253 retval = true;
254 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
255 "' from std::async for " << args->child_chunk->to_string() << endl);
256 }
257 return retval;
258}
259
260
268bool start_super_chunk_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
269 bool retval = false;
270 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
271 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
272 transfer_thread_counter++;
273 futures.push_back(std::async(std::launch::async, one_super_chunk_transfer_thread, std::move(args)));
274 retval = true;
275 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
276 "' from std::async for " << args->super_chunk->to_string(false) << endl);
277 }
278 return retval;
279}
280
288bool start_super_chunk_unconstrained_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
289 bool retval = false;
290 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
291 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
292 transfer_thread_counter++;
293 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread, std::move(args)));
294 retval = true;
295 BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
296 "' from std::async, transfer_thread_counter: " << transfer_thread_counter << endl);
297 }
298 return retval;
299}
300
301
322void read_super_chunks_unconstrained_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
323{
324 BESStopWatch sw;
325 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+array->name(), "");
326
327 // Parallel version based on read_chunks_unconstrained(). There is
328 // substantial duplication of the code in read_chunks_unconstrained(), but
329 // wait to remove that when we move to C++11 which has threads integrated.
330
331 // We maintain a list of futures to track our parallel activities.
332 list<future<bool>> futures;
333 try {
334 bool done = false;
335 bool future_finished = true;
336 while (!done) {
337
338 if(!futures.empty())
339 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
340
341 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
342 // because future::get() was called or a call to future::valid() returned false.
343 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
344
345 if (!super_chunks.empty()){
346 // Next we try to add a new Chunk compute thread if we can - there might be room.
347 bool thread_started = true;
348 while(thread_started && !super_chunks.empty()) {
349 auto super_chunk = super_chunks.front();
350 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
351
352 auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
353 thread_started = start_super_chunk_unconstrained_transfer_thread(futures, std::move(args));
354
355 if (thread_started) {
356 super_chunks.pop();
357 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
358 } else {
359 // Thread did not start, ownership of the arguments was not passed to the thread.
360 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
361 " transfer_thread_counter: " << transfer_thread_counter <<
362 " futures.size(): " << futures.size() << endl);
363 }
364 }
365 }
366 else {
367 // No more Chunks and no futures means we're done here.
368 if(futures.empty())
369 done = true;
370 }
371 future_finished = false;
372 }
373 }
374 catch (...) {
375 // Complete all the futures, otherwise we'll have threads out there using up resources
376 while(!futures.empty()){
377 if(futures.back().valid())
378 futures.back().get();
379 futures.pop_back();
380 }
381 // re-throw the exception
382 throw;
383 }
384}
385
386
387
388
409void read_super_chunks_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
410{
411 BESStopWatch sw;
412 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+array->name(), "");
413
414 // Parallel version based on read_chunks_unconstrained(). There is
415 // substantial duplication of the code in read_chunks_unconstrained(), but
416 // wait to remove that when we move to C++11 which has threads integrated.
417
418 // We maintain a list of futures to track our parallel activities.
419 list<future<bool>> futures;
420 try {
421 bool done = false;
422 bool future_finished = true;
423 while (!done) {
424
425 if(!futures.empty())
426 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
427
428 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
429 // because future::get() was called or a call to future::valid() returned false.
430 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
431
432 if (!super_chunks.empty()){
433 // Next we try to add a new Chunk compute thread if we can - there might be room.
434 bool thread_started = true;
435 while(thread_started && !super_chunks.empty()) {
436 auto super_chunk = super_chunks.front();
437 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
438
439 auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
440 thread_started = start_super_chunk_transfer_thread(futures, std::move(args));
441
442 if (thread_started) {
443 super_chunks.pop();
444 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
445 } else {
446 // Thread did not start, ownership of the arguments was not passed to the thread.
447 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
448 " transfer_thread_counter: " << transfer_thread_counter <<
449 " futures.size(): " << futures.size() << endl);
450 }
451 }
452 }
453 else {
454 // No more Chunks and no futures means we're done here.
455 if(futures.empty())
456 done = true;
457 }
458 future_finished = false;
459 }
460 }
461 catch (...) {
462 // Complete all the futures, otherwise we'll have threads out there using up resources
463 while(!futures.empty()){
464 if(futures.back().valid())
465 futures.back().get();
466 futures.pop_back();
467 }
468 // re-throw the exception
469 throw;
470 }
471}
472
491static unsigned long long
492get_index(const vector<unsigned long long> &address_in_target, const vector<unsigned long long> &target_shape)
493{
494 assert(address_in_target.size() == target_shape.size()); // ranks must be equal
495
496 auto shape_index = target_shape.rbegin();
497 auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
498
499 unsigned long long multiplier_var = *shape_index++;
500 unsigned long long offset = *index++;
501
502 while (index != index_end) {
503 assert(*index < *shape_index); // index < shape for each dim
504
505 offset += multiplier_var * *index++;
506 multiplier_var *= *shape_index++;
507 }
508
509 return offset;
510}
511
514
528static unsigned long multiplier(const vector<unsigned long long> &shape, unsigned int k)
529{
530 assert(shape.size() > 1);
531 assert(shape.size() > k + 1);
532
533 vector<unsigned long long>::const_iterator i = shape.begin(), e = shape.end();
534 advance(i, k + 1);
535 unsigned long multiplier = *i++;
536 while (i != e) {
537 multiplier *= *i++;
538 }
539
540 return multiplier;
541}
542
543//#####################################################################################################################
544//
545// DmrppArray code begins here.
546//
547// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
548
549DmrppArray &
550DmrppArray::operator=(const DmrppArray &rhs)
551{
552 if (this == &rhs) return *this;
553
554 dynamic_cast<Array &>(*this) = rhs; // run Constructor=
555
556 dynamic_cast<DmrppCommon &>(*this) = rhs;
557 // Removed DmrppCommon::m_duplicate_common(rhs); jhrg 11/12/21
558
559 return *this;
560}
561
566bool DmrppArray::is_projected()
567{
568 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
569 if (dimension_size(p, true) != dimension_size(p, false)) return true;
570
571 return false;
572}
573
580unsigned long long DmrppArray::get_size(bool constrained)
581{
582 // number of array elements in the constrained array
583 unsigned long long size = 1;
584 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
585 size *= dimension_size(dim, constrained);
586 }
587 return size;
588}
589
596vector<unsigned long long> DmrppArray::get_shape(bool constrained)
597{
598 auto dim = dim_begin(), edim = dim_end();
599 vector<unsigned long long> shape;
600
601 // For a 3d array, this method took 14ms without reserve(), 5ms with
602 // (when called many times).
603 shape.reserve(edim - dim);
604
605 for (; dim != edim; dim++) {
606 shape.push_back(dimension_size(dim, constrained));
607 }
608
609 return shape;
610}
611
617DmrppArray::dimension DmrppArray::get_dimension(unsigned int i)
618{
619 assert(i <= (dim_end() - dim_begin()));
620 return *(dim_begin() + i);
621}
622
625
636void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter, unsigned long *target_index,
637 vector<unsigned long long> &subset_addr,
638 const vector<unsigned long long> &array_shape, char /*Chunk*/*src_buf)
639{
640 BESDEBUG("dmrpp", "DmrppArray::" << __func__ << "() - subsetAddress.size(): " << subset_addr.size() << endl);
641
642 unsigned int bytes_per_elem = prototype()->width();
643
644 char *dest_buf = get_buf();
645
646 unsigned int start = this->dimension_start(dim_iter, true);
647 unsigned int stop = this->dimension_stop(dim_iter, true);
648 unsigned int stride = this->dimension_stride(dim_iter, true);
649
650 dim_iter++;
651
652 // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
653 // See the else clause for the general case.
654 if (dim_iter == dim_end() && stride == 1) {
655 // For the start and stop indexes of the subset, get the matching indexes in the whole array.
656 subset_addr.push_back(start);
657 unsigned long long start_index = get_index(subset_addr, array_shape);
658 subset_addr.pop_back();
659
660 subset_addr.push_back(stop);
661 unsigned long long stop_index = get_index(subset_addr, array_shape);
662 subset_addr.pop_back();
663
664 // Copy data block from start_index to stop_index
665 // TODO Replace this loop with a call to std::memcpy()
666 for (unsigned long source_index = start_index; source_index <= stop_index; source_index++) {
667 unsigned long target_byte = *target_index * bytes_per_elem;
668 unsigned long source_byte = source_index * bytes_per_elem;
669 // Copy a single value.
670 for (unsigned long i = 0; i < bytes_per_elem; i++) {
671 dest_buf[target_byte++] = src_buf[source_byte++];
672 }
673 (*target_index)++;
674 }
675
676 }
677 else {
678 for (unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
679
680 // Is it the last dimension?
681 if (dim_iter != dim_end()) {
682 // Nope! Then we recurse to the last dimension to read stuff
683 subset_addr.push_back(myDimIndex);
684 insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf);
685 subset_addr.pop_back();
686 }
687 else {
688 // We are at the last (innermost) dimension, so it's time to copy values.
689 subset_addr.push_back(myDimIndex);
690 unsigned int sourceIndex = get_index(subset_addr, array_shape);
691 subset_addr.pop_back();
692
693 // Copy a single value.
694 unsigned long target_byte = *target_index * bytes_per_elem;
695 unsigned long source_byte = sourceIndex * bytes_per_elem;
696
697 for (unsigned int i = 0; i < bytes_per_elem; i++) {
698 dest_buf[target_byte++] = src_buf[source_byte++];
699 }
700 (*target_index)++;
701 }
702 }
703 }
704}
705
722void DmrppArray::read_contiguous()
723{
724 BESStopWatch sw;
725 if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+name(), "");
726
727 // Get the single chunk that makes up this CONTIGUOUS variable.
728 if (get_chunks_size() != 1)
729 throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
730
731 // This is the original chunk for this 'contiguous' variable.
732 auto the_one_chunk = get_immutable_chunks()[0];
733
734 unsigned long long the_one_chunk_offset = the_one_chunk->get_offset();
735 unsigned long long the_one_chunk_size = the_one_chunk->get_size();
736
737 // We only want to read in the Chunk concurrently if:
738 // - Concurrent transfers are enabled (DmrppRequestHandler::d_use_transfer_threads)
739 // - The variables size is above the threshold value held in DmrppRequestHandler::d_contiguous_concurrent_threshold
740 if (!DmrppRequestHandler::d_use_transfer_threads || the_one_chunk_size <= DmrppRequestHandler::d_contiguous_concurrent_threshold) {
741 // Read the the_one_chunk as is. This is the non-parallel I/O case
742 the_one_chunk->read_chunk();
743 }
744 else {
745
746 // Allocate memory for the 'the_one_chunk' so the transfer threads can transfer data
747 // from the child chunks to it.
748 the_one_chunk->set_rbuf_to_size();
749
750 // The number of child chunks are determined based on the size of the data.
751 // If the size of the the_one_chunk is 3 MB then 3 chunks will be made. We will round down
752 // when necessary and handle the remainder later on (3.3MB = 3 chunks, 4.2MB = 4 chunks, etc.)
753 unsigned long long num_chunks = floor(the_one_chunk_size / MB);
754 if (num_chunks >= DmrppRequestHandler::d_max_transfer_threads)
755 num_chunks = DmrppRequestHandler::d_max_transfer_threads;
756
757 // Use the original chunk's size and offset to evenly split it into smaller chunks
758 unsigned long long chunk_size = the_one_chunk_size / num_chunks;
759 std::string chunk_byteorder = the_one_chunk->get_byte_order();
760
761 // If the size of the the_one_chunk is not evenly divisible by num_chunks, capture
762 // the remainder here and increase the size of the last chunk by this number of bytes.
763 unsigned long long chunk_remainder = the_one_chunk_size % num_chunks;
764
765 auto chunk_url = the_one_chunk->get_data_url();
766
767 // Set up a queue to break up the original the_one_chunk and keep track of the pieces
768 queue<shared_ptr<Chunk>> chunks_to_read;
769
770 // Make the Chunk objects
771 unsigned long long chunk_offset = the_one_chunk_offset;
772 for (unsigned int i = 0; i < num_chunks - 1; i++) {
773 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size, chunk_offset)));
774 chunk_offset += chunk_size;
775 }
776 // Make the remainder Chunk, see above for details.
777 chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder, chunk_offset)));
778
779 // We maintain a list of futures to track our parallel activities.
780 list<future<bool>> futures;
781 try {
782 bool done = false;
783 bool future_finished = true;
784 while (!done) {
785
786 if (!futures.empty())
787 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
788
789 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
790 // because future::get() was called or a call to future::valid() returned false.
791 BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
792
793 if (!chunks_to_read.empty()) {
794 // Next we try to add a new Chunk compute thread if we can - there might be room.
795 bool thread_started = true;
796 while (thread_started && !chunks_to_read.empty()) {
797 auto current_chunk = chunks_to_read.front();
798 BESDEBUG(dmrpp_3, prolog << "Starting thread for " << current_chunk->to_string() << endl);
799
800 auto args = unique_ptr<one_child_chunk_args_new>(new one_child_chunk_args_new(current_chunk, the_one_chunk));
801 thread_started = start_one_child_chunk_thread(futures, std::move(args));
802
803 if (thread_started) {
804 chunks_to_read.pop();
805 BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << current_chunk->to_string() << endl);
806 } else {
807 // Thread did not start, ownership of the arguments was not passed to the thread.
808 BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
809 " transfer_thread_counter: " << transfer_thread_counter <<
810 " futures.size(): " << futures.size() << endl);
811 }
812 }
813 } else {
814 // No more Chunks and no futures means we're done here.
815 if (futures.empty())
816 done = true;
817 }
818 future_finished = false;
819 }
820 }
821 catch (...) {
822 // Complete all the futures, otherwise we'll have threads out there using up resources
823 while (!futures.empty()) {
824 if (futures.back().valid())
825 futures.back().get();
826 futures.pop_back();
827 }
828 // re-throw the exception
829 throw;
830 }
831 }
832
833 // Now that the_one_chunk has been read, we do what is necessary...
834 if (!is_filters_empty()){
835 the_one_chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(),var()->width());
836 }
837
838 // The 'the_one_chunk' now holds the data values. Transfer it to the Array.
839 if (!is_projected()) { // if there is no projection constraint
840 reserve_value_capacity(get_size(false));
841 val2buf(the_one_chunk->get_rbuf()); // yes, it's not type-safe
842 }
843 else { // apply the constraint
844 vector<unsigned long long> array_shape = get_shape(false);
845
846 // Reserve space in this array for the constrained size of the data request
847 reserve_value_capacity(get_size(true));
848 unsigned long target_index = 0;
849 vector<unsigned long long> subset;
850
851 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf());
852 }
853
854 set_read_p(true);
855}
856
876void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk, unsigned int dim, unsigned long long array_offset,
877 const vector<unsigned long long> &array_shape,
878 unsigned long long chunk_offset, const vector<unsigned long long> &chunk_shape,
879 const vector<unsigned long long> &chunk_origin)
880{
881 // Now we figure out the correct last element. It's possible that a
882 // chunk 'extends beyond' the Array bounds. Here 'end_element' is the
883 // last element of the destination array
884 dimension thisDim = this->get_dimension(dim);
885 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
886 if ((unsigned) thisDim.stop < end_element) {
887 end_element = thisDim.stop;
888 }
889
890 unsigned long long chunk_end = end_element - chunk_origin[dim];
891
892 unsigned int last_dim = chunk_shape.size() - 1;
893 if (dim == last_dim) {
894 unsigned int elem_width = prototype()->width();
895
896 array_offset += chunk_origin[dim];
897
898 // Compute how much we are going to copy
899 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
900 char *source_buffer = chunk->get_rbuf();
901 char *target_buffer = get_buf();
902 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
903 }
904 else {
905 unsigned long mc = multiplier(chunk_shape, dim);
906 unsigned long ma = multiplier(array_shape, dim);
907
908 // Not the last dimension, so we continue to proceed down the Recursion Branch.
909 for (unsigned int chunk_index = 0 /*chunk_start*/; chunk_index <= chunk_end; ++chunk_index) {
910 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
911 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
912
913 // Re-entry here:
914 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
915 chunk_origin);
916 }
917 }
918}
919
931void DmrppArray::read_chunks_unconstrained()
932{
933 if (get_chunks_size() < 2)
934 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
935
936 // Find all the required chunks to read. I used a queue to preserve the chunk order, which
937 // made using a debugger easier. However, order does not matter, AFAIK.
938
939 unsigned long long sc_count=0;
940 stringstream sc_id;
941 sc_id << name() << "-" << sc_count++;
942 queue<shared_ptr<SuperChunk>> super_chunks;
943 auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this)) ;
944 super_chunks.push(current_super_chunk);
945
946 // Make the SuperChunks using all the chunks.
947 for(const auto& chunk: get_immutable_chunks()){
948 bool added = current_super_chunk->add_chunk(chunk);
949 if(!added){
950 sc_id.str(std::string());
951 sc_id << name() << "-" << sc_count++;
952 current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
953 super_chunks.push(current_super_chunk);
954 if(!current_super_chunk->add_chunk(chunk)){
955 stringstream msg ;
956 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
957 throw BESInternalError(msg.str(), __FILE__, __LINE__);
958 }
959 }
960 }
961 reserve_value_capacity(get_size());
962 // The size in element of each of the array's dimensions
963 const vector<unsigned long long> array_shape = get_shape(true);
964 // The size, in elements, of each of the chunk's dimensions
965 const vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
966
967
968 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
969 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
970
971 if (!DmrppRequestHandler::d_use_transfer_threads) { // Serial transfers
972#if DMRPP_ENABLE_THREAD_TIMERS
973 BESStopWatch sw(dmrpp_3);
974 sw.start(prolog + "Serial SuperChunk Processing.");
975#endif
976 while(!super_chunks.empty()) {
977 auto super_chunk = super_chunks.front();
978 super_chunks.pop();
979 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
980 // FIXME Since this is read_chunks_unconstrained, should call SuperChunk::read_unconstrained()
981 // jhrg 11/19/21
982 super_chunk->read();
983 }
984 }
985 else { // Parallel transfers
986#if DMRPP_ENABLE_THREAD_TIMERS
987 stringstream timer_name;
988 timer_name << prolog << "Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
989 BESStopWatch sw(dmrpp_3);
990 sw.start(timer_name.str());
991#endif
992 read_super_chunks_unconstrained_concurrent(super_chunks, this);
993 }
994 set_read_p(true);
995}
996
997
1000
1013unsigned long long DmrppArray::get_chunk_start(const dimension &thisDim, unsigned int chunk_origin)
1014{
1015 // What's the first element that we are going to access for this dimension of the chunk?
1016 unsigned long long first_element_offset = 0; // start with 0
1017 if ((unsigned) (thisDim.start) < chunk_origin) {
1018 // If the start is behind this chunk, then it's special.
1019 if (thisDim.stride != 1) {
1020 // And if the stride isn't 1, we have to figure our where to begin in this chunk.
1021 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1022 // If it's zero great!
1023 if (first_element_offset != 0) {
1024 // otherwise, adjust to get correct first element.
1025 first_element_offset = thisDim.stride - first_element_offset;
1026 }
1027 }
1028 }
1029 else {
1030 first_element_offset = thisDim.start - chunk_origin;
1031 }
1032
1033 return first_element_offset;
1034}
1035
1057shared_ptr<Chunk>
1058DmrppArray::find_needed_chunks(unsigned int dim, vector<unsigned long long> *target_element_address, shared_ptr<Chunk> chunk)
1059{
1060 BESDEBUG(dmrpp_3, prolog << " BEGIN, dim: " << dim << endl);
1061
1062 // The size, in elements, of each of the chunk's dimensions.
1063 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1064
1065 // The chunk's origin point a.k.a. its "position in array".
1066 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1067
1068 dimension thisDim = this->get_dimension(dim);
1069
1070 // Do we even want this chunk?
1071 if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1072 (unsigned) thisDim.stop < chunk_origin[dim]) {
1073 return nullptr; // No. No, we do not. Skip this chunk.
1074 }
1075
1076 // What's the first element that we are going to access for this dimension of the chunk?
1077 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1078
1079 // Is the next point to be sent in this chunk at all? If no, return.
1080 if (chunk_start > chunk_shape[dim]) {
1081 return nullptr;
1082 }
1083
1084 // Now we figure out the correct last element, based on the subset expression
1085 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1086 if ((unsigned) thisDim.stop < end_element) {
1087 end_element = thisDim.stop;
1088 }
1089
1090 unsigned long long chunk_end = end_element - chunk_origin[dim];
1091
1092 unsigned int last_dim = chunk_shape.size() - 1;
1093 if (dim == last_dim) {
1094 BESDEBUG(dmrpp_3, prolog << " END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1095 return chunk;
1096 }
1097 else {
1098 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1099 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1100 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1101
1102 // Re-entry here:
1103 auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1104 if (needed){
1105 BESDEBUG(dmrpp_3, prolog << " END, Found chunk: " << needed->to_string() << endl);
1106 return needed;
1107 }
1108
1109 }
1110 }
1111 BESDEBUG(dmrpp_3, prolog << " END, dim: " << dim << endl);
1112
1113 return nullptr;
1114}
1115
1135void DmrppArray::insert_chunk(
1136 unsigned int dim,
1137 vector<unsigned long long> *target_element_address,
1138 vector<unsigned long long> *chunk_element_address,
1139 shared_ptr<Chunk> chunk,
1140 const vector<unsigned long long> &constrained_array_shape){
1141
1142 // The size, in elements, of each of the chunk's dimensions.
1143 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1144
1145 // The chunk's origin point a.k.a. its "position in array".
1146 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1147
1148 dimension thisDim = this->get_dimension(dim);
1149
1150 // What's the first element that we are going to access for this dimension of the chunk?
1151 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1152
1153 // Now we figure out the correct last element, based on the subset expression
1154 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1155 if ((unsigned) thisDim.stop < end_element) {
1156 end_element = thisDim.stop;
1157 }
1158
1159 unsigned long long chunk_end = end_element - chunk_origin[dim];
1160
1161 unsigned int last_dim = chunk_shape.size() - 1;
1162 if (dim == last_dim) {
1163 char *source_buffer = chunk->get_rbuf();
1164 char *target_buffer = get_buf();
1165 unsigned int elem_width = prototype()->width();
1166
1167 if (thisDim.stride == 1) {
1168 // The start element in this array
1169 unsigned long long start_element = chunk_origin[dim] + chunk_start;
1170 // Compute how much we are going to copy
1171 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1172
1173 // Compute where we need to put it.
1174 (*target_element_address)[dim] = (start_element - thisDim.start); // / thisDim.stride;
1175 // Compute where we are going to read it from
1176 (*chunk_element_address)[dim] = chunk_start;
1177
1178 // See below re get_index()
1179 unsigned long long target_char_start_index =
1180 get_index(*target_element_address, constrained_array_shape) * elem_width;
1181 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1182
1183 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1184 chunk_constrained_inner_dim_bytes);
1185 }
1186 else {
1187 // Stride != 1
1188 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1189 // Compute where we need to put it.
1190 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1191
1192 // Compute where we are going to read it from
1193 (*chunk_element_address)[dim] = chunk_index;
1194
1195 // These calls to get_index() can be removed as with the insert...unconstrained() code.
1196 unsigned int target_char_start_index =
1197 get_index(*target_element_address, constrained_array_shape) * elem_width;
1198 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1199
1200 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1201 }
1202 }
1203 }
1204 else {
1205 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1206 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1207 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1208 (*chunk_element_address)[dim] = chunk_index;
1209
1210 // Re-entry here:
1211 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
1212 }
1213 }
1214}
1215
1222void DmrppArray::read_chunks()
1223{
1224 if (get_chunks_size() < 2)
1225 throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
1226
1227 // Find all the required chunks to read. I used a queue to preserve the chunk order, which
1228 // made using a debugger easier. However, order does not matter, AFAIK.
1229 unsigned long long sc_count=0;
1230 stringstream sc_id;
1231 sc_id << name() << "-" << sc_count++;
1232 queue<shared_ptr<SuperChunk>> super_chunks;
1233 auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(), this)) ;
1234 super_chunks.push(current_super_chunk);
1235
1236 // TODO We know that non-contiguous chunks may be forward or backward in the file from
1237 // the current offset. When an add_chunk() call fails, prior to making a new SuperChunk
1238 // we might want want try adding the rejected Chunk to the other existing SuperChunks to see
1239 // if it's contiguous there.
1240 // Find the required Chunks and put them into SuperChunks.
1241 for(const auto& chunk: get_immutable_chunks()){
1242 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1243 auto needed = find_needed_chunks(0 /* dimension */, &target_element_address, chunk);
1244 if (needed){
1245 bool added = current_super_chunk->add_chunk(chunk);
1246 if(!added){
1247 sc_id.str(std::string()); // Clears stringstream.
1248 sc_id << name() << "-" << sc_count++;
1249 current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
1250 super_chunks.push(current_super_chunk);
1251 if(!current_super_chunk->add_chunk(chunk)){
1252 stringstream msg ;
1253 msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1254 throw BESInternalError(msg.str(), __FILE__, __LINE__);
1255 }
1256 }
1257 }
1258 }
1259
1260 reserve_value_capacity(get_size(true));
1261
1262 BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
1263 BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1264 BESDEBUG(dmrpp_3, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
1265 BESDEBUG(dmrpp_3, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
1266 BESDEBUG(dmrpp_3, prolog << "SuperChunks.size(): " << super_chunks.size() << endl);
1267
1268 if (!DmrppRequestHandler::d_use_transfer_threads) {
1269 // This version is the 'serial' version of the code. It reads a chunk, inserts it,
1270 // reads the next one, and so on.
1271#if DMRPP_ENABLE_THREAD_TIMERS
1272 BESStopWatch sw(dmrpp_3);
1273 sw.start(prolog + "Serial SuperChunk Processing.");
1274#endif
1275 while (!super_chunks.empty()) {
1276 auto super_chunk = super_chunks.front();
1277 super_chunks.pop();
1278 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
1279 super_chunk->read();
1280 }
1281 }
1282 else {
1283#if DMRPP_ENABLE_THREAD_TIMERS
1284 stringstream timer_name;
1285 timer_name << prolog << "Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
1286 BESStopWatch sw(dmrpp_3);
1287 sw.start(timer_name.str());
1288#endif
1289 read_super_chunks_concurrent(super_chunks, this);
1290 }
1291 set_read_p(true);
1292}
1293
1294
1295#ifdef USE_READ_SERIAL
1317void DmrppArray::insert_chunk_serial(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
1318 Chunk *chunk)
1319{
1320 BESDEBUG("dmrpp", __func__ << " dim: "<< dim << " BEGIN "<< endl);
1321
1322 // The size, in elements, of each of the chunk's dimensions.
1323 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1324
1325 // The chunk's origin point a.k.a. its "position in array".
1326 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1327
1328 dimension thisDim = this->get_dimension(dim);
1329
1330 // Do we even want this chunk?
1331 if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
1332 return; // No. No, we do not. Skip this.
1333 }
1334
1335 // What's the first element that we are going to access for this dimension of the chunk?
1336 unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
1337
1338 // Is the next point to be sent in this chunk at all? If no, return.
1339 if (first_element_offset > chunk_shape[dim]) {
1340 return;
1341 }
1342
1343 // Now we figure out the correct last element, based on the subset expression
1344 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1345 if ((unsigned) thisDim.stop < end_element) {
1346 end_element = thisDim.stop;
1347 }
1348
1349 unsigned long long chunk_start = first_element_offset; //start_element - chunk_origin[dim];
1350 unsigned long long chunk_end = end_element - chunk_origin[dim];
1351 vector<unsigned int> constrained_array_shape = get_shape(true);
1352
1353 unsigned int last_dim = chunk_shape.size() - 1;
1354 if (dim == last_dim) {
1355 // Read and Process chunk
1356 chunk->read_chunk();
1357
1358 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
1359
1360 char *source_buffer = chunk->get_rbuf();
1361 char *target_buffer = get_buf();
1362 unsigned int elem_width = prototype()->width();
1363
1364 if (thisDim.stride == 1) {
1365 // The start element in this array
1366 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
1367 // Compute how much we are going to copy
1368 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1369
1370 // Compute where we need to put it.
1371 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
1372 // Compute where we are going to read it from
1373 (*chunk_element_address)[dim] = first_element_offset;
1374
1375 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1376 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1377
1378 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
1379 }
1380 else {
1381 // Stride != 1
1382 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1383 // Compute where we need to put it.
1384 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1385
1386 // Compute where we are going to read it from
1387 (*chunk_element_address)[dim] = chunk_index;
1388
1389 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1390 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1391
1392 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1393 }
1394 }
1395 }
1396 else {
1397 // Not the last dimension, so we continue to proceed down the Recursion Branch.
1398 for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1399 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1400 (*chunk_element_address)[dim] = chunk_index;
1401
1402 // Re-entry here:
1403 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
1404 }
1405 }
1406}
1407
1408void DmrppArray::read_chunks_serial()
1409{
1410 BESDEBUG("dmrpp", __func__ << " for variable '" << name() << "' - BEGIN" << endl);
1411
1412 vector<Chunk> &chunk_refs = get_chunk_vec();
1413 if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1414
1415 // Allocate target memory.
1416 reserve_value_capacity(get_size(true));
1417
1418 /*
1419 * Find the chunks to be read, make curl_easy handles for them, and
1420 * stuff them into our curl_multi handle. This is a recursive activity
1421 * which utilizes the same code that copies the data from the chunk to
1422 * the variables.
1423 */
1424 for (unsigned long i = 0; i < chunk_refs.size(); i++) {
1425 Chunk &chunk = chunk_refs[i];
1426
1427 vector<unsigned int> chunk_source_address(dimensions(), 0);
1428 vector<unsigned int> target_element_address = chunk.get_position_in_array();
1429
1430 // Recursive insertion operation.
1431 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
1432 }
1433
1434 set_read_p(true);
1435
1436 BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() for " << name() << " END"<< endl);
1437}
1438#endif
1439
1440void
1441DmrppArray::set_send_p(bool state)
1442{
1443 if (!get_attributes_loaded())
1444 load_attributes(this);
1445
1446 Array::set_send_p(state);
1447}
1448
1461{
1462 // If the chunks are not loaded, load them now. NB: load_chunks()
1463 // reads data for HDF5 COMPACT storage, so read_p() will be true.
1464 // Thus, call load_chunks() before testing read_p() to cover that
1465 // case. jhrg 11/15/21
1466 if (!get_chunks_loaded())
1467 load_chunks(this);
1468
1469 if (read_p()) return true;
1470
1471 // Single chunk and 'contiguous' are the same for this code.
1472
1473 if (get_chunks_size() == 1) {
1474 BESDEBUG(dmrpp_4, "Calling read_contiguous() for " << name() << endl);
1475 read_contiguous(); // Throws on various errors
1476 }
1477 else { // Handle the more complex case where the data is chunked.
1478 if (!is_projected()) {
1479 BESDEBUG(dmrpp_4, "Calling read_chunks_unconstrained() for " << name() << endl);
1480 read_chunks_unconstrained();
1481 }
1482 else {
1483 BESDEBUG(dmrpp_4, "Calling read_chunks() for " << name() << endl);
1484 read_chunks();
1485 }
1486 }
1487
1488 if (this->twiddle_bytes()) {
1489 int num = this->length();
1490 Type var_type = this->var()->type();
1491
1492 switch (var_type) {
1493 case dods_int16_c:
1494 case dods_uint16_c: {
1495 dods_uint16 *local = reinterpret_cast<dods_uint16*>(this->get_buf());
1496 while (num--) {
1497 *local = bswap_16(*local);
1498 local++;
1499 }
1500 break;
1501 }
1502 case dods_int32_c:
1503 case dods_uint32_c: {
1504 dods_uint32 *local = reinterpret_cast<dods_uint32*>(this->get_buf());;
1505 while (num--) {
1506 *local = bswap_32(*local);
1507 local++;
1508 }
1509 break;
1510 }
1511 case dods_int64_c:
1512 case dods_uint64_c: {
1513 dods_uint64 *local = reinterpret_cast<dods_uint64*>(this->get_buf());;
1514 while (num--) {
1515 *local = bswap_64(*local);
1516 local++;
1517 }
1518 break;
1519 }
1520 default: break; // Do nothing for all other types.
1521 }
1522 }
1523
1524 return true;
1525}
1526
1531class PrintD4ArrayDimXMLWriter : public unary_function<Array::dimension &, void> {
1532 XMLWriter &xml;
1533 // Was this variable constrained using local/direct slicing? i.e., is d_local_constraint set?
1534 // If so, don't use shared dimensions; instead emit Dim elements that are anonymous.
1535 bool d_constrained;
1536public:
1537
1538 PrintD4ArrayDimXMLWriter(XMLWriter &xml, bool c) :
1539 xml(xml), d_constrained(c)
1540 {
1541 }
1542
1543 void operator()(Array::dimension &d)
1544 {
1545 // This duplicates code in D4Dimensions (where D4Dimension::print_dap4() is defined
1546 // because of the need to print the constrained size of a dimension). I think that
1547 // the constraint information has to be kept here and not in the dimension (since they
1548 // are shared dims). Could hack print_dap4() to take the constrained size, however.
1549 if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) "Dim") < 0)
1550 throw InternalErr(__FILE__, __LINE__, "Could not write Dim element");
1551
1552 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1553 // If there is a name, there must be a Dimension (named dimension) in scope
1554 // so write its name but not its size.
1555 if (!d_constrained && !name.empty()) {
1556 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1557 (const xmlChar *) name.c_str()) < 0)
1558 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1559 }
1560 else if (d.use_sdim_for_slice) {
1561 assert(!name.empty());
1562 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1563 (const xmlChar *) name.c_str()) < 0)
1564 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1565 }
1566 else {
1567 ostringstream size;
1568 size << (d_constrained ? d.c_size : d.size);
1569 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "size",
1570 (const xmlChar *) size.str().c_str()) < 0)
1571 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1572 }
1573
1574 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1575 throw InternalErr(__FILE__, __LINE__, "Could not end Dim element");
1576 }
1577};
1578
1579class PrintD4ConstructorVarXMLWriter : public unary_function<BaseType *, void> {
1580 XMLWriter &xml;
1581 bool d_constrained;
1582public:
1583 PrintD4ConstructorVarXMLWriter(XMLWriter &xml, bool c) :
1584 xml(xml), d_constrained(c)
1585 {
1586 }
1587
1588 void operator()(BaseType *btp)
1589 {
1590 btp->print_dap4(xml, d_constrained);
1591 }
1592};
1593
1594class PrintD4MapXMLWriter : public unary_function<D4Map *, void> {
1595 XMLWriter &xml;
1596
1597public:
1598 PrintD4MapXMLWriter(XMLWriter &xml) :
1599 xml(xml)
1600 {
1601 }
1602
1603 void operator()(D4Map *m)
1604 {
1605 m->print_dap4(xml);
1606 }
1607};
1609
1633void DmrppArray::print_dap4(XMLWriter &xml, bool constrained /*false*/)
1634{
1635 if (constrained && !send_p()) return;
1636
1637 if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) var()->type_name().c_str()) < 0)
1638 throw InternalErr(__FILE__, __LINE__, "Could not write " + type_name() + " element");
1639
1640 if (!name().empty())
1641 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name", (const xmlChar *) name().c_str()) <
1642 0)
1643 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1644
1645 // Hack job... Copied from D4Enum::print_xml_writer. jhrg 11/12/13
1646 if (var()->type() == dods_enum_c) {
1647 D4Enum *e = static_cast<D4Enum *>(var());
1648 string path = e->enumeration()->name();
1649 if (e->enumeration()->parent()) {
1650 // print the FQN for the enum def; D4Group::FQN() includes the trailing '/'
1651 path = static_cast<D4Group *>(e->enumeration()->parent()->parent())->FQN() + path;
1652 }
1653 if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "enum", (const xmlChar *) path.c_str()) < 0)
1654 throw InternalErr(__FILE__, __LINE__, "Could not write attribute for enum");
1655 }
1656
1657 if (prototype()->is_constructor_type()) {
1658 Constructor &c = static_cast<Constructor &>(*prototype());
1659 for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1660 // bind2nd(mem_fun_ref(&BaseType::print_dap4), xml));
1661 }
1662
1663 // Drop the local_constraint which is per-array and use a per-dimension on instead
1664 for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1665
1666 attributes()->print_dap4(xml);
1667
1668 for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1669
1670 // Only print the chunks' info if there. This is the code added to libdap::Array::print_dap4().
1671 // jhrg 5/10/18
1674
1675 // If this variable uses the COMPACT layout, encode the values for
1676 // the array using base64. Note that strings are a special case; each
1677 // element of the array is a string and is encoded in its own base64
1678 // xml element. So, while an array of 10 int32 will be encoded in a
1679 // single base64 element, an array of 10 strings will use 10 base64
1680 // elements. This is because the size of each string's value is different.
1681 // Not so for an int32.
1682 if (DmrppCommon::d_print_chunks && is_compact_layout() && read_p()) {
1683 switch (var()->type()) {
1684 case dods_byte_c:
1685 case dods_char_c:
1686 case dods_int8_c:
1687 case dods_uint8_c:
1688 case dods_int16_c:
1689 case dods_uint16_c:
1690 case dods_int32_c:
1691 case dods_uint32_c:
1692 case dods_int64_c:
1693 case dods_uint64_c:
1694
1695 case dods_enum_c:
1696
1697 case dods_float32_c:
1698 case dods_float64_c: {
1699 u_int8_t *values = 0;
1700 try {
1701 size_t size = buf2val(reinterpret_cast<void **>(&values));
1702 string encoded = base64::Base64::encode(values, size);
1704 delete[] values;
1705 }
1706 catch (...) {
1707 delete[] values;
1708 throw;
1709 }
1710 break;
1711 }
1712
1713 case dods_str_c:
1714 case dods_url_c: {
1715 string *values = 0;
1716 try {
1717 // discard the return value of buf2val()
1718 buf2val(reinterpret_cast<void **>(&values));
1719 string str;
1720 for (int i = 0; i < length(); ++i) {
1721 str = (*(static_cast<string *> (values) + i));
1722 string encoded = base64::Base64::encode(reinterpret_cast<const u_int8_t *>(str.c_str()), str.size());
1724 }
1725 delete[] values;
1726 }
1727 catch (...) {
1728 delete[] values;
1729 throw;
1730 }
1731 break;
1732 }
1733
1734 default:
1735 throw InternalErr(__FILE__, __LINE__, "Vector::val2buf: bad type");
1736 }
1737 }
1738 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1739 throw InternalErr(__FILE__, __LINE__, "Could not end " + type_name() + " element");
1740}
1741
1742void DmrppArray::dump(ostream &strm) const
1743{
1744 strm << BESIndent::LMarg << "DmrppArray::" << __func__ << "(" << (void *) this << ")" << endl;
1745 BESIndent::Indent();
1746 DmrppCommon::dump(strm);
1747 Array::dump(strm);
1748 strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/endl;
1749 BESIndent::UnIndent();
1750}
1751
1752} // namespace dmrpp
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:168
exception thrown if internal error encountered
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:596
virtual unsigned long long get_size(bool constrained=false)
Return the total number of elements in this Array.
Definition: DmrppArray.cc:580
bool read() override
Read data for the array.
Definition: DmrppArray.cc:1460
virtual void print_dap4(libdap::XMLWriter &writer, bool constrained=false)
Shadow libdap::Array::print_dap4() - optionally prints DMR++ chunk information.
Definition: DmrppArray.cc:1633
static std::string d_ns_prefix
The XML namespace prefix to use.
Definition: DmrppCommon.h:120
virtual bool twiddle_bytes() const
Returns true if this object utilizes shuffle compression.
Definition: DmrppCommon.h:152
static bool d_print_chunks
if true, print_dap4() prints chunk elements
Definition: DmrppCommon.h:118
virtual bool is_compact_layout() const
Returns true if this object utilizes COMPACT layout.
Definition: DmrppCommon.h:142
virtual void load_attributes(libdap::BaseType *btp)
Load the attribute information for this variable.
Definition: DmrppCommon.cc:491
void print_compact_element(libdap::XMLWriter &xml, const std::string &name_space="", const std::string &encoded="")
Print the Compact base64-encoded information.
Definition: DmrppCommon.cc:404
virtual bool get_chunks_loaded() const
Have the chunks been loaded?
Definition: DmrppCommon.h:158
virtual size_t get_chunks_size() const
Use this when the number of chunks is needed.
Definition: DmrppCommon.h:175
void print_chunks_element(libdap::XMLWriter &xml, const std::string &name_space="")
Print the Chunk information.
Definition: DmrppCommon.cc:330
virtual const std::vector< std::shared_ptr< Chunk > > & get_immutable_chunks() const
A const reference to the vector of chunks.
Definition: DmrppCommon.h:169
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
Definition: DmrppCommon.h:179
virtual void load_chunks(libdap::BaseType *btp)
Load chunk information for this variable.
Definition: DmrppCommon.cc:482
virtual unsigned long long get_chunk_size_in_elements() const
Get the number of elements in this chunk.
Definition: DmrppCommon.h:185
virtual std::string get_filters() const
Return the names of all the filters in the order they were applied.
Definition: DmrppCommon.h:131
virtual bool get_attributes_loaded() const
Have the attributes been loaded?
Definition: DmrppCommon.h:162
Type
Type of JSON value.
Definition: rapidjson.h:664