00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include <new>
00031
00032 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00033
00034 #pragma warning (push)
00035 #pragma warning (disable: 4530)
00036 #endif
00037
00038 #include <iterator>
00039
00040 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00041 #pragma warning (pop)
00042 #endif
00043
00044
00045 namespace tbb {
00046
00047 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00048
00049
00050 namespace strict_ppl {
00051 template<typename T, typename A> class concurrent_queue;
00052 }
00053
00054 template<typename T, typename A> class concurrent_bounded_queue;
00055
00056 namespace deprecated {
00057 template<typename T, typename A> class concurrent_queue;
00058 }
00059 #endif
00060
00062 namespace strict_ppl {
00063
00065 namespace internal {
00066
00067 using namespace tbb::internal;
00068
00069 typedef size_t ticket;
00070
00071 template<typename T> class micro_queue ;
00072 template<typename T> class micro_queue_pop_finalizer ;
00073 template<typename T> class concurrent_queue_base_v3;
00074
00076
00079 struct concurrent_queue_rep_base : no_copy {
00080 template<typename T> friend class micro_queue;
00081 template<typename T> friend class concurrent_queue_base_v3;
00082
00083 protected:
00085 static const size_t phi = 3;
00086
00087 public:
00088
00089 static const size_t n_queue = 8;
00090
00092 struct page {
00093 page* next;
00094 uintptr_t mask;
00095 };
00096
00097 atomic<ticket> head_counter;
00098 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00099 atomic<ticket> tail_counter;
00100 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00101
00103 size_t items_per_page;
00104
00106 size_t item_size;
00107
00109 atomic<size_t> n_invalid_entries;
00110
00111 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00112 } ;
00113
00114 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
00115 return uintptr_t(p)>1;
00116 }
00117
00119
00122 class concurrent_queue_page_allocator
00123 {
00124 template<typename T> friend class micro_queue ;
00125 template<typename T> friend class micro_queue_pop_finalizer ;
00126 protected:
00127 virtual ~concurrent_queue_page_allocator() {}
00128 private:
00129 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00130 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00131 } ;
00132
00133 #if _MSC_VER && !defined(__INTEL_COMPILER)
00134
00135 #pragma warning( push )
00136 #pragma warning( disable: 4146 )
00137 #endif
00138
00140
00142 template<typename T>
00143 class micro_queue : no_copy {
00144 typedef concurrent_queue_rep_base::page page;
00145
00147 class destroyer: no_copy {
00148 T& my_value;
00149 public:
00150 destroyer( T& value ) : my_value(value) {}
00151 ~destroyer() {my_value.~T();}
00152 };
00153
00154 void copy_item( page& dst, size_t index, const void* src ) {
00155 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
00156 }
00157
00158 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00159 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
00160 }
00161
00162 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00163 T& from = get_ref(src,index);
00164 destroyer d(from);
00165 *static_cast<T*>(dst) = from;
00166 }
00167
00168 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00169
00170 public:
00171 friend class micro_queue_pop_finalizer<T>;
00172
00173 struct padded_page: page {
00175 padded_page();
00177 void operator=( const padded_page& );
00179 T last;
00180 };
00181
00182 static T& get_ref( page& p, size_t index ) {
00183 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
00184 }
00185
00186 atomic<page*> head_page;
00187 atomic<ticket> head_counter;
00188
00189 atomic<page*> tail_page;
00190 atomic<ticket> tail_counter;
00191
00192 spin_mutex page_mutex;
00193
00194 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00195
00196 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00197
00198 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00199
00200 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00201
00202 void invalidate_page_and_rethrow( ticket k ) ;
00203 };
00204
00205 template<typename T>
00206 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00207 atomic_backoff backoff;
00208 do {
00209 backoff.pause();
00210 if( counter&1 ) {
00211 ++rb.n_invalid_entries;
00212 throw_exception( eid_bad_last_alloc );
00213 }
00214 } while( counter!=k ) ;
00215 }
00216
00217 template<typename T>
00218 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00219 k &= -concurrent_queue_rep_base::n_queue;
00220 page* p = NULL;
00221 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00222 if( !index ) {
00223 __TBB_TRY {
00224 concurrent_queue_page_allocator& pa = base;
00225 p = pa.allocate_page();
00226 } __TBB_CATCH (...) {
00227 ++base.my_rep->n_invalid_entries;
00228 invalidate_page_and_rethrow( k );
00229 }
00230 p->mask = 0;
00231 p->next = NULL;
00232 }
00233
00234 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00235
00236 if( p ) {
00237 spin_mutex::scoped_lock lock( page_mutex );
00238 page* q = tail_page;
00239 if( is_valid_page(q) )
00240 q->next = p;
00241 else
00242 head_page = p;
00243 tail_page = p;
00244 } else {
00245 p = tail_page;
00246 }
00247
00248 __TBB_TRY {
00249 copy_item( *p, index, item );
00250
00251 p->mask |= uintptr_t(1)<<index;
00252 tail_counter += concurrent_queue_rep_base::n_queue;
00253 } __TBB_CATCH (...) {
00254 ++base.my_rep->n_invalid_entries;
00255 tail_counter += concurrent_queue_rep_base::n_queue;
00256 __TBB_RETHROW();
00257 }
00258 }
00259
00260 template<typename T>
00261 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00262 k &= -concurrent_queue_rep_base::n_queue;
00263 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00264 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00265 page& p = *head_page;
00266 __TBB_ASSERT( &p, NULL );
00267 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00268 bool success = false;
00269 {
00270 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
00271 if( p.mask & uintptr_t(1)<<index ) {
00272 success = true;
00273 assign_and_destroy_item( dst, p, index );
00274 } else {
00275 --base.my_rep->n_invalid_entries;
00276 }
00277 }
00278 return success;
00279 }
00280
00281 template<typename T>
00282 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00283 head_counter = src.head_counter;
00284 tail_counter = src.tail_counter;
00285 page_mutex = src.page_mutex;
00286
00287 const page* srcp = src.head_page;
00288 if( is_valid_page(srcp) ) {
00289 ticket g_index = head_counter;
00290 __TBB_TRY {
00291 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00292 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00293 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00294
00295 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00296 page* cur_page = head_page;
00297
00298 if( srcp != src.tail_page ) {
00299 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00300 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00301 cur_page = cur_page->next;
00302 }
00303
00304 __TBB_ASSERT( srcp==src.tail_page, NULL );
00305 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00306 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00307
00308 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00309 cur_page = cur_page->next;
00310 }
00311 tail_page = cur_page;
00312 } __TBB_CATCH (...) {
00313 invalidate_page_and_rethrow( g_index );
00314 }
00315 } else {
00316 head_page = tail_page = NULL;
00317 }
00318 return *this;
00319 }
00320
00321 template<typename T>
00322 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
00323
00324 page* invalid_page = (page*)uintptr_t(1);
00325 {
00326 spin_mutex::scoped_lock lock( page_mutex );
00327 tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00328 page* q = tail_page;
00329 if( is_valid_page(q) )
00330 q->next = invalid_page;
00331 else
00332 head_page = invalid_page;
00333 tail_page = invalid_page;
00334 }
00335 __TBB_RETHROW();
00336 }
00337
00338 template<typename T>
00339 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00340 concurrent_queue_page_allocator& pa = base;
00341 page* new_page = pa.allocate_page();
00342 new_page->next = NULL;
00343 new_page->mask = src_page->mask;
00344 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00345 if( new_page->mask & uintptr_t(1)<<begin_in_page )
00346 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00347 return new_page;
00348 }
00349
00350 template<typename T>
00351 class micro_queue_pop_finalizer: no_copy {
00352 typedef concurrent_queue_rep_base::page page;
00353 ticket my_ticket;
00354 micro_queue<T>& my_queue;
00355 page* my_page;
00356 concurrent_queue_page_allocator& allocator;
00357 public:
00358 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00359 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00360 {}
00361 ~micro_queue_pop_finalizer() ;
00362 };
00363
00364 template<typename T>
00365 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00366 page* p = my_page;
00367 if( is_valid_page(p) ) {
00368 spin_mutex::scoped_lock lock( my_queue.page_mutex );
00369 page* q = p->next;
00370 my_queue.head_page = q;
00371 if( !is_valid_page(q) ) {
00372 my_queue.tail_page = NULL;
00373 }
00374 }
00375 my_queue.head_counter = my_ticket;
00376 if( is_valid_page(p) ) {
00377 allocator.deallocate_page( p );
00378 }
00379 }
00380
00381 #if _MSC_VER && !defined(__INTEL_COMPILER)
00382 #pragma warning( pop )
00383 #endif // warning 4146 is back
00384
00385 template<typename T> class concurrent_queue_iterator_rep ;
00386 template<typename T> class concurrent_queue_iterator_base_v3;
00387
00389
00392 template<typename T>
00393 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00394 micro_queue<T> array[n_queue];
00395
00397 static size_t index( ticket k ) {
00398 return k*phi%n_queue;
00399 }
00400
00401 micro_queue<T>& choose( ticket k ) {
00402
00403 return array[index(k)];
00404 }
00405 };
00406
00408
00412 template<typename T>
00413 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00415 concurrent_queue_rep<T>* my_rep;
00416
00417 friend struct concurrent_queue_rep<T>;
00418 friend class micro_queue<T>;
00419 friend class concurrent_queue_iterator_rep<T>;
00420 friend class concurrent_queue_iterator_base_v3<T>;
00421
00422 protected:
00423 typedef typename concurrent_queue_rep<T>::page page;
00424
00425 private:
00426 typedef typename micro_queue<T>::padded_page padded_page;
00427
00428 virtual page *allocate_page() {
00429 concurrent_queue_rep<T>& r = *my_rep;
00430 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00431 return reinterpret_cast<page*>(allocate_block ( n ));
00432 }
00433
00434 virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00435 concurrent_queue_rep<T>& r = *my_rep;
00436 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00437 deallocate_block( reinterpret_cast<void*>(p), n );
00438 }
00439
00441 virtual void *allocate_block( size_t n ) = 0;
00442
00444 virtual void deallocate_block( void *p, size_t n ) = 0;
00445
00446 protected:
00447 concurrent_queue_base_v3();
00448
00449 virtual ~concurrent_queue_base_v3() {
00450 #if __TBB_USE_ASSERT
00451 size_t nq = my_rep->n_queue;
00452 for( size_t i=0; i<nq; i++ )
00453 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00454 #endif
00455 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00456 }
00457
00459 void internal_push( const void* src ) {
00460 concurrent_queue_rep<T>& r = *my_rep;
00461 ticket k = r.tail_counter++;
00462 r.choose(k).push( src, k, *this );
00463 }
00464
00466
00467 bool internal_try_pop( void* dst ) ;
00468
00470 size_t internal_size() const ;
00471
00473 bool internal_empty() const ;
00474
00476
00477 void internal_finish_clear() ;
00478
00480 void internal_throw_exception() const {
00481 throw_exception( eid_bad_alloc );
00482 }
00483
00485 void assign( const concurrent_queue_base_v3& src ) ;
00486 };
00487
00488 template<typename T>
00489 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
00490 const size_t item_size = sizeof(T);
00491 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00492 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00493 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00494 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00495 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00496 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00497 my_rep->item_size = item_size;
00498 my_rep->items_per_page = item_size<=8 ? 32 :
00499 item_size<=16 ? 16 :
00500 item_size<=32 ? 8 :
00501 item_size<=64 ? 4 :
00502 item_size<=128 ? 2 :
00503 1;
00504 }
00505
00506 template<typename T>
00507 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00508 concurrent_queue_rep<T>& r = *my_rep;
00509 ticket k;
00510 do {
00511 k = r.head_counter;
00512 for(;;) {
00513 if( r.tail_counter<=k ) {
00514
00515 return false;
00516 }
00517
00518 ticket tk=k;
00519 #if defined(_MSC_VER) && defined(_Wp64)
00520 #pragma warning (push)
00521 #pragma warning (disable: 4267)
00522 #endif
00523 k = r.head_counter.compare_and_swap( tk+1, tk );
00524 #if defined(_MSC_VER) && defined(_Wp64)
00525 #pragma warning (pop)
00526 #endif
00527 if( k==tk )
00528 break;
00529
00530 }
00531 } while( !r.choose( k ).pop( dst, k, *this ) );
00532 return true;
00533 }
00534
00535 template<typename T>
00536 size_t concurrent_queue_base_v3<T>::internal_size() const {
00537 concurrent_queue_rep<T>& r = *my_rep;
00538 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00539 ticket hc = r.head_counter;
00540 size_t nie = r.n_invalid_entries;
00541 ticket tc = r.tail_counter;
00542 __TBB_ASSERT( hc!=tc || !nie, NULL );
00543 ptrdiff_t sz = tc-hc-nie;
00544 return sz<0 ? 0 : size_t(sz);
00545 }
00546
00547 template<typename T>
00548 bool concurrent_queue_base_v3<T>::internal_empty() const {
00549 concurrent_queue_rep<T>& r = *my_rep;
00550 ticket tc = r.tail_counter;
00551 ticket hc = r.head_counter;
00552
00553 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00554 }
00555
00556 template<typename T>
00557 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00558 concurrent_queue_rep<T>& r = *my_rep;
00559 size_t nq = r.n_queue;
00560 for( size_t i=0; i<nq; ++i ) {
00561 page* tp = r.array[i].tail_page;
00562 if( is_valid_page(tp) ) {
00563 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00564 deallocate_page( tp );
00565 r.array[i].tail_page = NULL;
00566 } else
00567 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
00568 }
00569 }
00570
00571 template<typename T>
00572 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00573 concurrent_queue_rep<T>& r = *my_rep;
00574 r.items_per_page = src.my_rep->items_per_page;
00575
00576
00577 r.head_counter = src.my_rep->head_counter;
00578 r.tail_counter = src.my_rep->tail_counter;
00579 r.n_invalid_entries = src.my_rep->n_invalid_entries;
00580
00581
00582 for( size_t i = 0; i<r.n_queue; ++i )
00583 r.array[i].assign( src.my_rep->array[i], *this);
00584
00585 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
00586 "the source concurrent queue should not be concurrently modified." );
00587 }
00588
00589 template<typename Container, typename Value> class concurrent_queue_iterator;
00590
00591 template<typename T>
00592 class concurrent_queue_iterator_rep: no_assign {
00593 typedef typename micro_queue<T>::padded_page padded_page;
00594 public:
00595 ticket head_counter;
00596 const concurrent_queue_base_v3<T>& my_queue;
00597 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00598 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00599 head_counter(queue.my_rep->head_counter),
00600 my_queue(queue)
00601 {
00602 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00603 array[k] = queue.my_rep->array[k].head_page;
00604 }
00605
00607 bool get_item( T*& item, size_t k ) ;
00608 };
00609
00610 template<typename T>
00611 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
00612 if( k==my_queue.my_rep->tail_counter ) {
00613 item = NULL;
00614 return true;
00615 } else {
00616 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00617 __TBB_ASSERT(p,NULL);
00618 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00619 item = µ_queue<T>::get_ref(*p,i);
00620 return (p->mask & uintptr_t(1)<<i)!=0;
00621 }
00622 }
00623
00625
00626 template<typename Value>
00627 class concurrent_queue_iterator_base_v3 : no_assign {
00629
00630 concurrent_queue_iterator_rep<Value>* my_rep;
00631
00632 template<typename C, typename T, typename U>
00633 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00634
00635 template<typename C, typename T, typename U>
00636 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00637 protected:
00639 Value* my_item;
00640
00642 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00643 #if __GNUC__==4&&__GNUC_MINOR__==3
00644
00645 __TBB_release_consistency_helper();
00646 #endif
00647 }
00648
00650 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
00651 : no_assign(), my_rep(NULL), my_item(NULL) {
00652 assign(i);
00653 }
00654
00656 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00657
00659 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00660
00662 void advance() ;
00663
00665 ~concurrent_queue_iterator_base_v3() {
00666 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00667 my_rep = NULL;
00668 }
00669 };
00670
00671 template<typename Value>
00672 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00673 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00674 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00675 size_t k = my_rep->head_counter;
00676 if( !my_rep->get_item(my_item, k) ) advance();
00677 }
00678
00679 template<typename Value>
00680 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00681 if( my_rep!=other.my_rep ) {
00682 if( my_rep ) {
00683 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00684 my_rep = NULL;
00685 }
00686 if( other.my_rep ) {
00687 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00688 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00689 }
00690 }
00691 my_item = other.my_item;
00692 }
00693
00694 template<typename Value>
00695 void concurrent_queue_iterator_base_v3<Value>::advance() {
00696 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
00697 size_t k = my_rep->head_counter;
00698 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00699 #if TBB_USE_ASSERT
00700 Value* tmp;
00701 my_rep->get_item(tmp,k);
00702 __TBB_ASSERT( my_item==tmp, NULL );
00703 #endif
00704 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00705 if( i==queue.my_rep->items_per_page-1 ) {
00706 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00707 root = root->next;
00708 }
00709
00710 my_rep->head_counter = ++k;
00711 if( !my_rep->get_item(my_item, k) ) advance();
00712 }
00713
00715
00716 template<typename T> struct tbb_remove_cv {typedef T type;};
00717 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
00718 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
00719 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
00720
00722
00724 template<typename Container, typename Value>
00725 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
00726 public std::iterator<std::forward_iterator_tag,Value> {
00727 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00728 template<typename T, class A>
00729 friend class ::tbb::strict_ppl::concurrent_queue;
00730 #else
00731 public:
00732 #endif
00734 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00735 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
00736 {
00737 }
00738
00739 public:
00740 concurrent_queue_iterator() {}
00741
00742 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00743 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
00744 {}
00745
00747 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00748 this->assign(other);
00749 return *this;
00750 }
00751
00753 Value& operator*() const {
00754 return *static_cast<Value*>(this->my_item);
00755 }
00756
00757 Value* operator->() const {return &operator*();}
00758
00760 concurrent_queue_iterator& operator++() {
00761 this->advance();
00762 return *this;
00763 }
00764
00766 Value* operator++(int) {
00767 Value* result = &operator*();
00768 operator++();
00769 return result;
00770 }
00771 };
00772
00773
00774 template<typename C, typename T, typename U>
00775 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00776 return i.my_item==j.my_item;
00777 }
00778
00779 template<typename C, typename T, typename U>
00780 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00781 return i.my_item!=j.my_item;
00782 }
00783
00784 }
00785
00787
00788 }
00789
00791 namespace internal {
00792
00793 class concurrent_queue_rep;
00794 class concurrent_queue_iterator_rep;
00795 class concurrent_queue_iterator_base_v3;
00796 template<typename Container, typename Value> class concurrent_queue_iterator;
00797
00799
00801 class concurrent_queue_base_v3: no_copy {
00803 concurrent_queue_rep* my_rep;
00804
00805 friend class concurrent_queue_rep;
00806 friend struct micro_queue;
00807 friend class micro_queue_pop_finalizer;
00808 friend class concurrent_queue_iterator_rep;
00809 friend class concurrent_queue_iterator_base_v3;
00810 protected:
00812 struct page {
00813 page* next;
00814 uintptr_t mask;
00815 };
00816
00818 ptrdiff_t my_capacity;
00819
00821 size_t items_per_page;
00822
00824 size_t item_size;
00825
00826 #if __TBB_GCC_3_3_PROTECTED_BROKEN
00827 public:
00828 #endif
00829 template<typename T>
00830 struct padded_page: page {
00832 padded_page();
00834 void operator=( const padded_page& );
00836 T last;
00837 };
00838
00839 private:
00840 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00841 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00842 protected:
00843 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00844 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00845
00847 void __TBB_EXPORTED_METHOD internal_push( const void* src );
00848
00850 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00851
00853 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00854
00856
00857 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00858
00860 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00861
00863 bool __TBB_EXPORTED_METHOD internal_empty() const;
00864
00866 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00867
00869 virtual page *allocate_page() = 0;
00870
00872 virtual void deallocate_page( page *p ) = 0;
00873
00875
00876 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00877
00879 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00880
00882 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00883
00884 private:
00885 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00886 };
00887
00889
00890 class concurrent_queue_iterator_base_v3 {
00892
00893 concurrent_queue_iterator_rep* my_rep;
00894
00895 template<typename C, typename T, typename U>
00896 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00897
00898 template<typename C, typename T, typename U>
00899 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00900
00901 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00902 protected:
00904 void* my_item;
00905
00907 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00908
00910 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00911 assign(i);
00912 }
00913
00915
00916 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00917
00919 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00920
00922 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00923
00925 void __TBB_EXPORTED_METHOD advance();
00926
00928 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00929 };
00930
00931 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00932
00934
00936 template<typename Container, typename Value>
00937 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00938 public std::iterator<std::forward_iterator_tag,Value> {
00939
00940 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00941 template<typename T, class A>
00942 friend class ::tbb::concurrent_bounded_queue;
00943
00944 template<typename T, class A>
00945 friend class ::tbb::deprecated::concurrent_queue;
00946 #else
00947 public:
00948 #endif
00950 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00951 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
00952 {
00953 }
00954
00955 public:
00956 concurrent_queue_iterator() {}
00957
00960 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00961 concurrent_queue_iterator_base_v3(other)
00962 {}
00963
00965 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00966 assign(other);
00967 return *this;
00968 }
00969
00971 Value& operator*() const {
00972 return *static_cast<Value*>(my_item);
00973 }
00974
00975 Value* operator->() const {return &operator*();}
00976
00978 concurrent_queue_iterator& operator++() {
00979 advance();
00980 return *this;
00981 }
00982
00984 Value* operator++(int) {
00985 Value* result = &operator*();
00986 operator++();
00987 return result;
00988 }
00989 };
00990
00991
00992 template<typename C, typename T, typename U>
00993 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00994 return i.my_item==j.my_item;
00995 }
00996
00997 template<typename C, typename T, typename U>
00998 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00999 return i.my_item!=j.my_item;
01000 }
01001
01002 }
01003
01005
01006 }
01007
01008 #endif