concurrent_hash_map.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_hash_map_H
00022 #define __TBB_concurrent_hash_map_H
00023 
00024 #include <stdexcept>
00025 #include <iterator>
00026 #include <utility>      // Need std::pair
00027 #include <cstring>      // Need std::memset
00028 #include <string>
00029 #include "tbb_stddef.h"
00030 #include "cache_aligned_allocator.h"
00031 #include "tbb_allocator.h"
00032 #include "spin_rw_mutex.h"
00033 #include "atomic.h"
00034 #include "aligned_space.h"
00035 #if TBB_USE_PERFORMANCE_WARNINGS
00036 #include <typeinfo>
00037 #endif
00038 
00039 namespace tbb {
00040 
00041 template<typename T> struct tbb_hash_compare;
00042 template<typename Key, typename T, typename HashCompare = tbb_hash_compare<Key>, typename A = tbb_allocator<std::pair<Key, T> > >
00043 class concurrent_hash_map;
00044 
00046 namespace internal {
00048     void* __TBB_EXPORTED_FUNC itt_load_pointer_with_acquire_v3( const void* src );
00050     void __TBB_EXPORTED_FUNC itt_store_pointer_with_release_v3( void* dst, void* src );
00052     void* __TBB_EXPORTED_FUNC itt_load_pointer_v3( const void* src );
00053 
00055     typedef size_t hashcode_t;
00057     class hash_map_base {
00058     public:
00060         typedef size_t size_type;
00062         typedef size_t hashcode_t;
00064         typedef size_t segment_index_t;
00066         struct node_base : no_copy {
00068             typedef spin_rw_mutex mutex_t;
00070             typedef mutex_t::scoped_lock scoped_t;
00072             node_base *next;
00073             mutex_t mutex;
00074         };
00076 #       define __TBB_rehash_req reinterpret_cast<node_base*>(1)
00078 #       define __TBB_empty_rehashed reinterpret_cast<node_base*>(0)
00080         struct bucket : no_copy {
00082             typedef spin_rw_mutex mutex_t;
00084             typedef mutex_t::scoped_lock scoped_t;
00085             mutex_t mutex;
00086             node_base *node_list;
00087         };
00089         static size_type const embedded_block = 1;
00091         static size_type const embedded_buckets = 1<<embedded_block;
00093         static size_type const first_block = 8; //including embedded_block. perfect with bucket size 16, so the allocations are power of 4096
00095         static size_type const pointers_per_table = sizeof(segment_index_t) * 8; // one segment per bit
00097         typedef bucket *segment_ptr_t;
00099         typedef segment_ptr_t segments_table_t[pointers_per_table];
00101         atomic<hashcode_t> my_mask;
00103         segments_table_t my_table;
00105         atomic<size_type> my_size; // It must be in separate cache line from my_mask due to performance effects
00107         bucket my_embedded_segment[embedded_buckets];
00108 
00110         hash_map_base() {
00111             std::memset( this, 0, pointers_per_table*sizeof(segment_ptr_t) // 32*4=128   or 64*8=512
00112                 + sizeof(my_size) + sizeof(my_mask)  // 4+4 or 8+8
00113                 + embedded_buckets*sizeof(bucket) ); // n*8 or n*16
00114             for( size_type i = 0; i < embedded_block; i++ ) // fill the table
00115                 my_table[i] = my_embedded_segment + segment_base(i);
00116             my_mask = embedded_buckets - 1;
00117             __TBB_ASSERT( embedded_block <= first_block, "The first block number must include embedded blocks");
00118         }
00119 
00121         static segment_index_t segment_index_of( size_type index ) {
00122             return segment_index_t( __TBB_Log2( index|1 ) );
00123         }
00124 
00126         static segment_index_t segment_base( segment_index_t k ) {
00127             return (segment_index_t(1)<<k & ~segment_index_t(1));
00128         }
00129 
00131         static size_type segment_size( segment_index_t k ) {
00132             return size_type(1)<<k; // fake value for k==0
00133         }
00134         
00136         static bool is_valid( void *ptr ) {
00137             return ptr > reinterpret_cast<void*>(1);
00138         }
00139 
00141         static void init_buckets( segment_ptr_t ptr, size_type sz, bool is_initial ) {
00142             if( is_initial ) std::memset(ptr, 0, sz*sizeof(bucket) );
00143             else for(size_type i = 0; i < sz; i++, ptr++) {
00144                     *reinterpret_cast<intptr_t*>(&ptr->mutex) = 0;
00145                     ptr->node_list = __TBB_rehash_req;
00146                 }
00147         }
00148         
00150         static void add_to_bucket( bucket *b, node_base *n ) {
00151             n->next = b->node_list;
00152             b->node_list = n; // its under lock and flag is set
00153         }
00154 
00156         struct enable_segment_failsafe {
00157             segment_ptr_t *my_segment_ptr;
00158             enable_segment_failsafe(segments_table_t &table, segment_index_t k) : my_segment_ptr(&table[k]) {}
00159             ~enable_segment_failsafe() {
00160                 if( my_segment_ptr ) *my_segment_ptr = 0; // indicate no allocation in progress
00161             }
00162         };
00163 
00165         void enable_segment( segment_index_t k, bool is_initial = false ) {
00166             __TBB_ASSERT( k, "Zero segment must be embedded" );
00167             enable_segment_failsafe watchdog( my_table, k );
00168             cache_aligned_allocator<bucket> alloc;
00169             size_type sz;
00170             __TBB_ASSERT( !is_valid(my_table[k]), "Wrong concurrent assignment");
00171             if( k >= first_block ) {
00172                 sz = segment_size( k );
00173                 segment_ptr_t ptr = alloc.allocate( sz );
00174                 init_buckets( ptr, sz, is_initial );
00175 #if TBB_USE_THREADING_TOOLS
00176                 // TODO: actually, fence and notification are unneccessary here and below
00177                 itt_store_pointer_with_release_v3( my_table + k, ptr );
00178 #else
00179                 my_table[k] = ptr;// my_mask has release fence
00180 #endif
00181                 sz <<= 1;// double it to get entire capacity of the container
00182             } else { // the first block
00183                 __TBB_ASSERT( k == embedded_block, "Wrong segment index" );
00184                 sz = segment_size( first_block );
00185                 segment_ptr_t ptr = alloc.allocate( sz - embedded_buckets );
00186                 init_buckets( ptr, sz - embedded_buckets, is_initial );
00187                 ptr -= segment_base(embedded_block);
00188                 for(segment_index_t i = embedded_block; i < first_block; i++) // calc the offsets
00189 #if TBB_USE_THREADING_TOOLS
00190                     itt_store_pointer_with_release_v3( my_table + i, ptr + segment_base(i) );
00191 #else
00192                     my_table[i] = ptr + segment_base(i);
00193 #endif
00194             }
00195 #if TBB_USE_THREADING_TOOLS
00196             itt_store_pointer_with_release_v3( &my_mask, (void*)(sz-1) );
00197 #else
00198             my_mask = sz - 1;
00199 #endif
00200             watchdog.my_segment_ptr = 0;
00201         }
00202 
00204         bucket *get_bucket( hashcode_t h ) const throw() { // TODO: add throw() everywhere?
00205             segment_index_t s = segment_index_of( h );
00206             h -= segment_base(s);
00207             segment_ptr_t seg = my_table[s];
00208             __TBB_ASSERT( is_valid(seg), "hashcode must be cut by valid mask for allocated segments" );
00209             return &seg[h];
00210         }
00211 
00213         // Splitting into two functions should help inlining
00214         inline bool check_mask_race( const hashcode_t h, hashcode_t &m ) const {
00215             hashcode_t m_now, m_old = m;
00216 #if TBB_USE_THREADING_TOOLS
00217             m_now = (hashcode_t) itt_load_pointer_with_acquire_v3( &my_mask );
00218 #else
00219             m_now = my_mask;
00220 #endif
00221             if( m_old != m_now )
00222                 return check_rehashing_collision( h, m_old, m = m_now );
00223             return false;
00224         }
00225 
00227         bool check_rehashing_collision( const hashcode_t h, hashcode_t m_old, hashcode_t m ) const {
00228             __TBB_ASSERT(m_old != m, NULL); // TODO?: m arg could be optimized out by passing h = h&m
00229             if( (h & m_old) != (h & m) ) { // mask changed for this hashcode, rare event
00230                 // condition above proves that 'h' has some other bits set beside 'm_old'
00231                 // find next applicable mask after m_old    //TODO: look at bsl instruction
00232                 for( ++m_old; !(h & m_old); m_old <<= 1 ); // at maximum few rounds depending on the first block size
00233                 m_old = (m_old<<1) - 1; // get full mask from a bit
00234                 __TBB_ASSERT((m_old&(m_old+1))==0 && m_old <= m, NULL);
00235                 // check whether it is rehashing/ed
00236 #if TBB_USE_THREADING_TOOLS
00237                 if( itt_load_pointer_with_acquire_v3(&( get_bucket(h & m_old)->node_list )) != __TBB_rehash_req )
00238 #else
00239                 if( __TBB_load_with_acquire(get_bucket( h & m_old )->node_list) != __TBB_rehash_req )
00240 #endif
00241                     return true;
00242             }
00243             return false;
00244         }
00245 
00247         segment_index_t insert_new_node( bucket *b, node_base *n, hashcode_t mask ) {
00248             size_type sz = ++my_size; // prefix form is to enforce allocation after the first item inserted
00249             add_to_bucket( b, n );
00250             // check load factor
00251             if( sz >= mask ) { // TODO: add custom load_factor 
00252                 segment_index_t new_seg = segment_index_of( mask+1 );
00253                 __TBB_ASSERT( is_valid(my_table[new_seg-1]), "new allocations must not publish new mask until segment has allocated");
00254 #if TBB_USE_THREADING_TOOLS
00255                 if( !itt_load_pointer_v3(my_table+new_seg)
00256 #else
00257                 if( !my_table[new_seg]
00258 #endif
00259                                   && __TBB_CompareAndSwapW(&my_table[new_seg], 1, 0) == 0 )
00260                     return new_seg; // The value must be processed
00261             }
00262             return 0;
00263         }
00264 
00266         void reserve(size_type buckets) {
00267             if( !buckets-- ) return;
00268             bool is_initial = !my_size;
00269             for( size_type m = my_mask; buckets > m; m = my_mask )
00270                 enable_segment( segment_index_of( m+1 ), is_initial );
00271         }
00273         void internal_swap(hash_map_base &table) {
00274             std::swap(this->my_mask, table.my_mask);
00275             std::swap(this->my_size, table.my_size);
00276             for(size_type i = 0; i < embedded_buckets; i++)
00277                 std::swap(this->my_embedded_segment[i].node_list, table.my_embedded_segment[i].node_list);
00278             for(size_type i = embedded_block; i < pointers_per_table; i++)
00279                 std::swap(this->my_table[i], table.my_table[i]);
00280         }
00281     };
00282 
00283     template<typename Iterator>
00284     class hash_map_range;
00285 
00287 
00289     template<typename Container, typename Value>
00290     class hash_map_iterator
00291         : public std::iterator<std::forward_iterator_tag,Value>
00292     {
00293         typedef Container map_type;
00294         typedef typename Container::node node;
00295         typedef hash_map_base::node_base node_base;
00296         typedef hash_map_base::bucket bucket;
00297 
00298         template<typename C, typename T, typename U>
00299         friend bool operator==( const hash_map_iterator<C,T>& i, const hash_map_iterator<C,U>& j );
00300 
00301         template<typename C, typename T, typename U>
00302         friend bool operator!=( const hash_map_iterator<C,T>& i, const hash_map_iterator<C,U>& j );
00303 
00304         template<typename C, typename T, typename U>
00305         friend ptrdiff_t operator-( const hash_map_iterator<C,T>& i, const hash_map_iterator<C,U>& j );
00306     
00307         template<typename C, typename U>
00308         friend class internal::hash_map_iterator;
00309 
00310         template<typename I>
00311         friend class internal::hash_map_range;
00312 
00313         void advance_to_next_bucket() { // TODO?: refactor to iterator_base class
00314             size_t k = my_index+1;
00315             while( my_bucket && k <= my_map->my_mask ) {
00316                 // Following test uses 2's-complement wizardry
00317                 if( k& (k-2) ) // not the begining of a segment
00318                     ++my_bucket;
00319                 else my_bucket = my_map->get_bucket( k );
00320                 my_node = static_cast<node*>( my_bucket->node_list );
00321                 if( hash_map_base::is_valid(my_node) ) {
00322                     my_index = k; return;
00323                 }
00324                 ++k;
00325             }
00326             my_bucket = 0; my_node = 0; my_index = k; // the end
00327         }
00328 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00329         template<typename Key, typename T, typename HashCompare, typename A>
00330         friend class tbb::concurrent_hash_map;
00331 #else
00332     public: // workaround
00333 #endif
00335         const Container *my_map;
00336 
00338         size_t my_index;
00339 
00341         const bucket *my_bucket;
00342 
00344         node *my_node;
00345 
00346         hash_map_iterator( const Container &map, size_t index, const bucket *b, node_base *n );
00347 
00348     public:
00350         hash_map_iterator() {}
00351         hash_map_iterator( const hash_map_iterator<Container,typename Container::value_type> &other ) :
00352             my_map(other.my_map),
00353             my_index(other.my_index),
00354             my_bucket(other.my_bucket),
00355             my_node(other.my_node)
00356         {}
00357         Value& operator*() const {
00358             __TBB_ASSERT( hash_map_base::is_valid(my_node), "iterator uninitialized or at end of container?" );
00359             return my_node->item;
00360         }
00361         Value* operator->() const {return &operator*();}
00362         hash_map_iterator& operator++();
00363         
00365         Value* operator++(int) {
00366             Value* result = &operator*();
00367             operator++();
00368             return result;
00369         }
00370     };
00371 
00372     template<typename Container, typename Value>
00373     hash_map_iterator<Container,Value>::hash_map_iterator( const Container &map, size_t index, const bucket *b, node_base *n ) :
00374         my_map(&map),
00375         my_index(index),
00376         my_bucket(b),
00377         my_node( static_cast<node*>(n) )
00378     {
00379         if( b && !hash_map_base::is_valid(n) )
00380             advance_to_next_bucket();
00381     }
00382 
00383     template<typename Container, typename Value>
00384     hash_map_iterator<Container,Value>& hash_map_iterator<Container,Value>::operator++() {
00385         my_node = static_cast<node*>( my_node->next );
00386         if( !my_node ) advance_to_next_bucket();
00387         return *this;
00388     }
00389 
00390     template<typename Container, typename T, typename U>
00391     bool operator==( const hash_map_iterator<Container,T>& i, const hash_map_iterator<Container,U>& j ) {
00392         return i.my_node == j.my_node && i.my_map == j.my_map;
00393     }
00394 
00395     template<typename Container, typename T, typename U>
00396     bool operator!=( const hash_map_iterator<Container,T>& i, const hash_map_iterator<Container,U>& j ) {
00397         return i.my_node != j.my_node || i.my_map != j.my_map;
00398     }
00399 
00401 
00402     template<typename Iterator>
00403     class hash_map_range {
00404         typedef typename Iterator::map_type map_type;
00405         Iterator my_begin;
00406         Iterator my_end;
00407         mutable Iterator my_midpoint;
00408         size_t my_grainsize;
00410         void set_midpoint() const;
00411         template<typename U> friend class hash_map_range;
00412     public:
00414         typedef std::size_t size_type;
00415         typedef typename Iterator::value_type value_type;
00416         typedef typename Iterator::reference reference;
00417         typedef typename Iterator::difference_type difference_type;
00418         typedef Iterator iterator;
00419 
00421         bool empty() const {return my_begin==my_end;}
00422 
00424         bool is_divisible() const {
00425             return my_midpoint!=my_end;
00426         }
00428         hash_map_range( hash_map_range& r, split ) : 
00429             my_end(r.my_end),
00430             my_grainsize(r.my_grainsize)
00431         {
00432             r.my_end = my_begin = r.my_midpoint;
00433             __TBB_ASSERT( !empty(), "Splitting despite the range is not divisible" );
00434             __TBB_ASSERT( !r.empty(), "Splitting despite the range is not divisible" );
00435             set_midpoint();
00436             r.set_midpoint();
00437         }
00439         template<typename U>
00440         hash_map_range( hash_map_range<U>& r) : 
00441             my_begin(r.my_begin),
00442             my_end(r.my_end),
00443             my_midpoint(r.my_midpoint),
00444             my_grainsize(r.my_grainsize)
00445         {}
00446 #if TBB_DEPRECATED
00448         hash_map_range( const Iterator& begin_, const Iterator& end_, size_type grainsize = 1 ) : 
00449             my_begin(begin_), 
00450             my_end(end_),
00451             my_grainsize(grainsize)
00452         {
00453             if(!my_end.my_index && !my_end.my_bucket) // end
00454                 my_end.my_index = my_end.my_map->my_mask + 1;
00455             set_midpoint();
00456             __TBB_ASSERT( grainsize>0, "grainsize must be positive" );
00457         }
00458 #endif
00460         hash_map_range( const map_type &map, size_type grainsize = 1 ) : 
00461             my_begin( Iterator( map, 0, map.my_embedded_segment, map.my_embedded_segment->node_list ) ),
00462             my_end( Iterator( map, map.my_mask + 1, 0, 0 ) ),
00463             my_grainsize( grainsize )
00464         {
00465             __TBB_ASSERT( grainsize>0, "grainsize must be positive" );
00466             set_midpoint();
00467         }
00468         const Iterator& begin() const {return my_begin;}
00469         const Iterator& end() const {return my_end;}
00471         size_type grainsize() const {return my_grainsize;}
00472     };
00473 
00474     template<typename Iterator>
00475     void hash_map_range<Iterator>::set_midpoint() const {
00476         // Split by groups of nodes
00477         size_t m = my_end.my_index-my_begin.my_index;
00478         if( m > my_grainsize ) {
00479             m = my_begin.my_index + m/2u;
00480             hash_map_base::bucket *b = my_begin.my_map->get_bucket(m);
00481             my_midpoint = Iterator(*my_begin.my_map,m,b,b->node_list);
00482         } else {
00483             my_midpoint = my_end;
00484         }
00485         __TBB_ASSERT( my_begin.my_index <= my_midpoint.my_index,
00486             "my_begin is after my_midpoint" );
00487         __TBB_ASSERT( my_midpoint.my_index <= my_end.my_index,
00488             "my_midpoint is after my_end" );
00489         __TBB_ASSERT( my_begin != my_midpoint || my_begin == my_end,
00490             "[my_begin, my_midpoint) range should not be empty" );
00491     }
00492 } // namespace internal
00494 
00496 static const size_t hash_multiplier = sizeof(size_t)==4? 2654435769U : 11400714819323198485ULL;
00498 template<typename T>
00499 inline static size_t tbb_hasher( const T& t ) {
00500     return static_cast<size_t>( t ) * hash_multiplier;
00501 }
00502 template<typename P>
00503 inline static size_t tbb_hasher( P* ptr ) {
00504     size_t const h = reinterpret_cast<size_t>( ptr );
00505     return (h >> 3) ^ h;
00506 }
00507 template<typename E, typename S, typename A>
00508 inline static size_t tbb_hasher( const std::basic_string<E,S,A>& s ) {
00509     size_t h = 0;
00510     for( const E* c = s.c_str(); *c; c++ )
00511         h = static_cast<size_t>(*c) ^ (h * hash_multiplier);
00512     return h;
00513 }
00514 template<typename F, typename S>
00515 inline static size_t tbb_hasher( const std::pair<F,S>& p ) {
00516     return tbb_hasher(p.first) ^ tbb_hasher(p.second);
00517 }
00518 
00520 template<typename T>
00521 struct tbb_hash_compare {
00522     static size_t hash( const T& t ) { return tbb_hasher(t); }
00523     static bool equal( const T& a, const T& b ) { return a == b; }
00524 };
00525 
00527 
00555 template<typename Key, typename T, typename HashCompare, typename Allocator>
00556 class concurrent_hash_map : protected internal::hash_map_base {
00557     template<typename Container, typename Value>
00558     friend class internal::hash_map_iterator;
00559 
00560     template<typename I>
00561     friend class internal::hash_map_range;
00562 
00563 public:
00564     typedef Key key_type;
00565     typedef T mapped_type;
00566     typedef std::pair<const Key,T> value_type;
00567     typedef internal::hash_map_base::size_type size_type;
00568     typedef ptrdiff_t difference_type;
00569     typedef value_type *pointer;
00570     typedef const value_type *const_pointer;
00571     typedef value_type &reference;
00572     typedef const value_type &const_reference;
00573     typedef internal::hash_map_iterator<concurrent_hash_map,value_type> iterator;
00574     typedef internal::hash_map_iterator<concurrent_hash_map,const value_type> const_iterator;
00575     typedef internal::hash_map_range<iterator> range_type;
00576     typedef internal::hash_map_range<const_iterator> const_range_type;
00577     typedef Allocator allocator_type;
00578 
00579 protected:
00580     friend class const_accessor;
00581     struct node;
00582     typedef typename Allocator::template rebind<node>::other node_allocator_type;
00583     node_allocator_type my_allocator;
00584     HashCompare my_hash_compare;
00585 
00586     struct node : public node_base {
00587         value_type item;
00588         node( const Key &key ) : item(key, T()) {}
00589         node( const Key &key, const T &t ) : item(key, t) {}
00590         // exception-safe allocation, see C++ Standard 2003, clause 5.3.4p17
00591         void *operator new( size_t /*size*/, node_allocator_type &a ) {
00592             void *ptr = a.allocate(1);
00593             if(!ptr) throw std::bad_alloc();
00594             return ptr;
00595         }
00596         // match placement-new form above to be called if exception thrown in constructor
00597         void operator delete( void *ptr, node_allocator_type &a ) {return a.deallocate(static_cast<node*>(ptr),1); }
00598     };
00599 
00600     void delete_node( node_base *n ) {
00601         my_allocator.destroy( static_cast<node*>(n) );
00602         my_allocator.deallocate( static_cast<node*>(n), 1);
00603     }
00604 
00605     node *search_bucket( const key_type &key, bucket *b ) const {
00606         node *n = static_cast<node*>( b->node_list );
00607         while( is_valid(n) && !my_hash_compare.equal(key, n->item.first) )
00608             n = static_cast<node*>( n->next );
00609         __TBB_ASSERT(n != __TBB_rehash_req, "Search can be executed only for rehashed bucket");
00610         return n;
00611     }
00612 
00614     class bucket_accessor : public bucket::scoped_t {
00615         bool my_is_writer; // TODO: use it from base type
00616         bucket *my_b;
00617     public:
00618         bucket_accessor( concurrent_hash_map *base, const hashcode_t h, bool writer = false ) { acquire( base, h, writer ); }
00620         inline void acquire( concurrent_hash_map *base, const hashcode_t h, bool writer = false ) {
00621             my_b = base->get_bucket( h );
00622 #if TBB_USE_THREADING_TOOLS
00623             // TODO: actually, notification is unneccessary here, just hiding double-check
00624             if( itt_load_pointer_with_acquire_v3(&my_b->node_list) == __TBB_rehash_req
00625 #else
00626             if( __TBB_load_with_acquire(my_b->node_list) == __TBB_rehash_req
00627 #endif
00628                 && try_acquire( my_b->mutex, /*write=*/true ) )
00629             {
00630                 if( my_b->node_list == __TBB_rehash_req ) base->rehash_bucket( my_b, h ); //recursive rehashing
00631                 my_is_writer = true;
00632             }
00633             else bucket::scoped_t::acquire( my_b->mutex, /*write=*/my_is_writer = writer );
00634             __TBB_ASSERT( my_b->node_list != __TBB_rehash_req, NULL);
00635         }
00637         bool is_writer() { return my_is_writer; }
00639         bucket *operator() () { return my_b; }
00640         // TODO: optimize out
00641         bool upgrade_to_writer() { my_is_writer = true; return bucket::scoped_t::upgrade_to_writer(); }
00642     };
00643 
00644     // TODO refactor to hash_base
00645     void rehash_bucket( bucket *b_new, const hashcode_t h ) {
00646         __TBB_ASSERT( *(intptr_t*)(&b_new->mutex), "b_new must be locked (for write)");
00647         __TBB_ASSERT( h > 1, "The lowermost buckets can't be rehashed" );
00648         __TBB_store_with_release(b_new->node_list, __TBB_empty_rehashed); // mark rehashed
00649         hashcode_t mask = ( 1u<<__TBB_Log2( h ) ) - 1; // get parent mask from the topmost bit
00650 
00651         bucket_accessor b_old( this, h & mask );
00652 
00653         mask = (mask<<1) | 1; // get full mask for new bucket
00654         __TBB_ASSERT( (mask&(mask+1))==0 && (h & mask) == h, NULL );
00655     restart:
00656         for( node_base **p = &b_old()->node_list, *n = __TBB_load_with_acquire(*p); is_valid(n); n = *p ) {
00657             hashcode_t c = my_hash_compare.hash( static_cast<node*>(n)->item.first );
00658             if( (c & mask) == h ) {
00659                 if( !b_old.is_writer() )
00660                     if( !b_old.upgrade_to_writer() ) {
00661                         goto restart; // node ptr can be invalid due to concurrent erase
00662                     }
00663                 *p = n->next; // exclude from b_old
00664                 add_to_bucket( b_new, n );
00665             } else p = &n->next; // iterate to next item
00666         }
00667     }
00668 
00669 public:
00670     
00671     class accessor;
00673     class const_accessor {
00674         friend class concurrent_hash_map<Key,T,HashCompare,Allocator>;
00675         friend class accessor;
00676         void operator=( const accessor & ) const; // Deny access
00677         const_accessor( const accessor & );       // Deny access
00678     public:
00680         typedef const typename concurrent_hash_map::value_type value_type;
00681 
00683         bool empty() const {return !my_node;}
00684 
00686         void release() {
00687             if( my_node ) {
00688                 my_lock.release();
00689                 my_node = 0;
00690             }
00691         }
00692 
00694         const_reference operator*() const {
00695             __TBB_ASSERT( my_node, "attempt to dereference empty accessor" );
00696             return my_node->item;
00697         }
00698 
00700         const_pointer operator->() const {
00701             return &operator*();
00702         }
00703 
00705         const_accessor() : my_node(NULL) {}
00706 
00708         ~const_accessor() {
00709             my_node = NULL; // my_lock.release() is called in scoped_lock destructor
00710         }
00711     private:
00712         node *my_node;
00713         typename node::scoped_t my_lock;
00714         hashcode_t my_hash;
00715     };
00716 
00718     class accessor: public const_accessor {
00719     public:
00721         typedef typename concurrent_hash_map::value_type value_type;
00722 
00724         reference operator*() const {
00725             __TBB_ASSERT( this->my_node, "attempt to dereference empty accessor" );
00726             return this->my_node->item;
00727         }
00728 
00730         pointer operator->() const {
00731             return &operator*();
00732         }
00733     };
00734 
00736     concurrent_hash_map(const allocator_type &a = allocator_type())
00737         : my_allocator(a)
00738     {}
00739 
00741     concurrent_hash_map( const concurrent_hash_map& table, const allocator_type &a = allocator_type())
00742         : my_allocator(a)
00743     {
00744         internal_copy(table);
00745     }
00746 
00748     template<typename I>
00749     concurrent_hash_map(I first, I last, const allocator_type &a = allocator_type())
00750         : my_allocator(a)
00751     {
00752         reserve( std::distance(first, last) ); // TODO: load_factor?
00753         internal_copy(first, last);
00754     }
00755 
00757     concurrent_hash_map& operator=( const concurrent_hash_map& table ) {
00758         if( this!=&table ) {
00759             clear();
00760             internal_copy(table);
00761         } 
00762         return *this;
00763     }
00764 
00765 
00767     void clear();
00768 
00770     ~concurrent_hash_map() { clear(); }
00771 
00772     //------------------------------------------------------------------------
00773     // Parallel algorithm support
00774     //------------------------------------------------------------------------
00775     range_type range( size_type grainsize=1 ) {
00776         return range_type( *this, grainsize );
00777     }
00778     const_range_type range( size_type grainsize=1 ) const {
00779         return const_range_type( *this, grainsize );
00780     }
00781 
00782     //------------------------------------------------------------------------
00783     // STL support - not thread-safe methods
00784     //------------------------------------------------------------------------
00785     iterator begin() {return iterator(*this,0,my_embedded_segment,my_embedded_segment->node_list);}
00786     iterator end() {return iterator(*this,0,0,0);}
00787     const_iterator begin() const {return const_iterator(*this,0,my_embedded_segment,my_embedded_segment->node_list);}
00788     const_iterator end() const {return const_iterator(*this,0,0,0);}
00789     std::pair<iterator, iterator> equal_range( const Key& key ) { return internal_equal_range(key, end()); }
00790     std::pair<const_iterator, const_iterator> equal_range( const Key& key ) const { return internal_equal_range(key, end()); }
00791     
00793     size_type size() const { return my_size; }
00794 
00796     bool empty() const { return my_size == 0; }
00797 
00799     size_type max_size() const {return (~size_type(0))/sizeof(node);}
00800 
00802     allocator_type get_allocator() const { return this->my_allocator; }
00803 
00805     void swap(concurrent_hash_map &table);
00806 
00807     //------------------------------------------------------------------------
00808     // concurrent map operations
00809     //------------------------------------------------------------------------
00810 
00812     size_type count( const Key &key ) const {
00813         return const_cast<concurrent_hash_map*>(this)->lookup(/*insert*/false, key, NULL, NULL, /*write=*/false );
00814     }
00815 
00817 
00818     bool find( const_accessor &result, const Key &key ) const {
00819         result.release();
00820         return const_cast<concurrent_hash_map*>(this)->lookup(/*insert*/false, key, NULL, &result, /*write=*/false );
00821     }
00822 
00824 
00825     bool find( accessor &result, const Key &key ) {
00826         result.release();
00827         return lookup(/*insert*/false, key, NULL, &result, /*write=*/true );
00828     }
00829         
00831 
00832     bool insert( const_accessor &result, const Key &key ) {
00833         result.release();
00834         return lookup(/*insert*/true, key, NULL, &result, /*write=*/false );
00835     }
00836 
00838 
00839     bool insert( accessor &result, const Key &key ) {
00840         result.release();
00841         return lookup(/*insert*/true, key, NULL, &result, /*write=*/true );
00842     }
00843 
00845 
00846     bool insert( const_accessor &result, const value_type &value ) {
00847         result.release();
00848         return lookup(/*insert*/true, value.first, &value.second, &result, /*write=*/false );
00849     }
00850 
00852 
00853     bool insert( accessor &result, const value_type &value ) {
00854         result.release();
00855         return lookup(/*insert*/true, value.first, &value.second, &result, /*write=*/true );
00856     }
00857 
00859 
00860     bool insert( const value_type &value ) {
00861         return lookup(/*insert*/true, value.first, &value.second, NULL, /*write=*/false );
00862     }
00863 
00865     template<typename I>
00866     void insert(I first, I last) {
00867         for(; first != last; ++first)
00868             insert( *first );
00869     }
00870 
00872 
00873     bool erase( const Key& key );
00874 
00876 
00877     bool erase( const_accessor& item_accessor ) {
00878         return exclude( item_accessor, /*readonly=*/ true );
00879     }
00880 
00882 
00883     bool erase( accessor& item_accessor ) {
00884         return exclude( item_accessor, /*readonly=*/ false );
00885     }
00886 
00887 protected:
00889     bool lookup( bool op_insert, const Key &key, const T *t, const_accessor *result, bool write );
00890 
00892     bool exclude( const_accessor &item_accessor, bool readonly );
00893 
00895     template<typename I>
00896     std::pair<I, I> internal_equal_range( const Key& key, I end ) const;
00897 
00899     void internal_copy( const concurrent_hash_map& source );
00900 
00901     template<typename I>
00902     void internal_copy(I first, I last);
00903 
00905     const_pointer find( const Key& key ) const {
00906         hashcode_t h = my_hash_compare.hash( key );
00907         hashcode_t m = my_mask;
00908     restart:
00909         __TBB_ASSERT((m&(m+1))==0, NULL);
00910         bucket *b = get_bucket( h & m );
00911         if( b->node_list == __TBB_rehash_req ) {
00912             bucket::scoped_t lock;
00913             if( lock.try_acquire( b->mutex, /*write=*/true ) && b->node_list == __TBB_rehash_req )
00914                 const_cast<concurrent_hash_map*>(this)->rehash_bucket( b, h & m ); //recursive rehashing
00915             else internal::spin_wait_while_eq( b->node_list, __TBB_rehash_req ); //TODO: rework for fast find?
00916         }
00917         node *n = search_bucket( key, b );
00918         if( check_mask_race( h, m ) )
00919             goto restart;
00920         if( n )
00921             return &n->item;
00922         return 0;
00923     }
00924 };
00925 
00926 #if _MSC_VER && !defined(__INTEL_COMPILER)
00927     // Suppress "conditional expression is constant" warning.
00928     #pragma warning( push )
00929     #pragma warning( disable: 4127 )
00930 #endif
00931 
00932 template<typename Key, typename T, typename HashCompare, typename A>
00933 bool concurrent_hash_map<Key,T,HashCompare,A>::lookup( bool op_insert, const Key &key, const T *t, const_accessor *result, bool write ) {
00934     __TBB_ASSERT( !result || !result->my_node, NULL );
00935     segment_index_t grow_segment;
00936     bool return_value;
00937     node *n, *tmp_n = 0;
00938     hashcode_t const h = my_hash_compare.hash( key );
00939 #if TBB_USE_THREADING_TOOLS
00940     hashcode_t m = (hashcode_t) itt_load_pointer_with_acquire_v3( &my_mask );
00941 #else
00942     hashcode_t m = my_mask;
00943 #endif
00944     restart:
00945     {//lock scope
00946         __TBB_ASSERT((m&(m+1))==0, NULL);
00947         return_value = false;
00948         // get bucket
00949         bucket_accessor b( this, h & m );
00950 
00951         // find a node
00952         n = search_bucket( key, b() );
00953         if( op_insert ) {
00954             // [opt] insert a key
00955             if( !is_valid(n) ) {
00956                 if( !tmp_n ) {
00957                     if(t) tmp_n = new( my_allocator ) node(key, *t);
00958                     else  tmp_n = new( my_allocator ) node(key);
00959                 }
00960                 if( !b.is_writer() && !b.upgrade_to_writer() ) { // TODO: improved insertion
00961                     // Rerun search_list, in case another thread inserted the item during the upgrade.
00962                     n = search_bucket( key, b() );
00963                     if( is_valid(n) ) { // unfortunately, it did
00964                         b.downgrade_to_reader();
00965                         goto exists;
00966                     }
00967                 }
00968                 if( check_mask_race(h, m) )
00969                     goto restart; // b.release() is done in ~b().
00970                 // insert and set flag to grow the container
00971                 grow_segment = insert_new_node( b(), n = tmp_n, m );
00972                 tmp_n = 0;
00973                 return_value = true;
00974             } else {
00975     exists:     grow_segment = 0;
00976             }
00977         } else { // find or count
00978             if( !n ) {
00979                 if( check_mask_race( h, m ) )
00980                     goto restart; // b.release() is done in ~b(). TODO: replace by continue
00981                 return false;
00982             }
00983             return_value = true;
00984             grow_segment = 0;
00985         }
00986         if( !result ) goto check_growth;
00987         // TODO: the following seems as generic/regular operation
00988         // acquire the item
00989         if( !result->my_lock.try_acquire( n->mutex, write ) ) {
00990             // we are unlucky, prepare for longer wait
00991             internal::atomic_backoff trials;
00992             do {
00993                 if( !trials.bounded_pause() ) {
00994                     // the wait takes really long, restart the operation
00995                     b.release();
00996                     __TBB_Yield();
00997                     m = my_mask;
00998                     goto restart;
00999                 }
01000             } while( !result->my_lock.try_acquire( n->mutex, write ) );
01001         }
01002     }//lock scope
01003     result->my_node = n;
01004     result->my_hash = h;
01005 check_growth:
01006     // [opt] grow the container
01007     if( grow_segment )
01008         enable_segment( grow_segment );
01009     if( tmp_n ) // if op_insert only
01010         delete_node( tmp_n );
01011     return return_value;
01012 }
01013 
01014 template<typename Key, typename T, typename HashCompare, typename A>
01015 template<typename I>
01016 std::pair<I, I> concurrent_hash_map<Key,T,HashCompare,A>::internal_equal_range( const Key& key, I end ) const {
01017     hashcode_t h = my_hash_compare.hash( key );
01018     hashcode_t m = my_mask;
01019     __TBB_ASSERT((m&(m+1))==0, NULL);
01020     h &= m;
01021     bucket *b = get_bucket( h );
01022     while( b->node_list == __TBB_rehash_req ) {
01023         m = ( 1u<<__TBB_Log2( h ) ) - 1; // get parent mask from the topmost bit
01024         b = get_bucket( h &= m );
01025     }
01026     node *n = search_bucket( key, b );
01027     if( !n )
01028         return std::make_pair(end, end);
01029     iterator lower(*this, h, b, n), upper(lower);
01030     return std::make_pair(lower, ++upper);
01031 }
01032 
01033 template<typename Key, typename T, typename HashCompare, typename A>
01034 bool concurrent_hash_map<Key,T,HashCompare,A>::exclude( const_accessor &item_accessor, bool readonly ) {
01035     __TBB_ASSERT( item_accessor.my_node, NULL );
01036     node_base *const n = item_accessor.my_node;
01037     item_accessor.my_node = NULL; // we ought release accessor anyway
01038     hashcode_t const h = item_accessor.my_hash;
01039     hashcode_t m = my_mask;
01040     do {
01041         // get bucket
01042         bucket_accessor b( this, h & m, /*writer=*/true );
01043         node_base **p = &b()->node_list;
01044         while( *p && *p != n )
01045             p = &(*p)->next;
01046         if( !*p ) { // someone else was the first
01047             if( check_mask_race( h, m ) )
01048                 continue;
01049             item_accessor.my_lock.release();
01050             return false;
01051         }
01052         __TBB_ASSERT( *p == n, NULL );
01053         *p = n->next; // remove from container
01054         my_size--;
01055         break;
01056     } while(true);
01057     if( readonly ) // need to get exclusive lock
01058         item_accessor.my_lock.upgrade_to_writer(); // return value means nothing here
01059     item_accessor.my_lock.release();
01060     delete_node( n ); // Only one thread can delete it due to write lock on the chain_mutex
01061     return true;
01062 }
01063 
01064 template<typename Key, typename T, typename HashCompare, typename A>
01065 bool concurrent_hash_map<Key,T,HashCompare,A>::erase( const Key &key ) {
01066     node_base *n;
01067     hashcode_t const h = my_hash_compare.hash( key );
01068     hashcode_t m = my_mask;
01069     {//lock scope
01070     restart:
01071         // get bucket
01072         bucket_accessor b( this, h & m );
01073     search:
01074         node_base **p = &b()->node_list;
01075         n = *p;
01076         while( is_valid(n) && !my_hash_compare.equal(key, static_cast<node*>(n)->item.first ) ) {
01077             p = &n->next;
01078             n = *p;
01079         }
01080         if( !n ) { // not found, but mask could be changed
01081             if( check_mask_race( h, m ) )
01082                 goto restart;
01083             return false;
01084         }
01085         else if( !b.is_writer() && !b.upgrade_to_writer() ) {
01086             if( check_mask_race( h, m ) ) // contended upgrade, check mask
01087                 goto restart;
01088             goto search;
01089         }
01090         *p = n->next;
01091         my_size--;
01092     }
01093     {
01094         typename node::scoped_t item_locker( n->mutex, /*write=*/true );
01095     }
01096     // note: there should be no threads pretending to acquire this mutex again, do not try to upgrade const_accessor!
01097     delete_node( n ); // Only one thread can delete it due to write lock on the bucket
01098     return true;
01099 }
01100 
01101 template<typename Key, typename T, typename HashCompare, typename A>
01102 void concurrent_hash_map<Key,T,HashCompare,A>::swap(concurrent_hash_map<Key,T,HashCompare,A> &table) {
01103     std::swap(this->my_allocator, table.my_allocator);
01104     std::swap(this->my_hash_compare, table.my_hash_compare);
01105     internal_swap(table);
01106 }
01107 
01108 template<typename Key, typename T, typename HashCompare, typename A>
01109 void concurrent_hash_map<Key,T,HashCompare,A>::clear() {
01110     hashcode_t m = my_mask;
01111     __TBB_ASSERT((m&(m+1))==0, NULL);
01112 #if TBB_USE_DEBUG || TBB_USE_PERFORMANCE_WARNINGS
01113 #if TBB_USE_PERFORMANCE_WARNINGS
01114     int size = int(my_size), buckets = int(m)+1, empty_buckets = 0, overpopulated_buckets = 0; // usage statistics
01115     static bool reported = false;
01116 #endif
01117     // check consistency
01118     for( segment_index_t b = 0; b <= m; b++ ) {
01119         node_base *n = get_bucket(b)->node_list;
01120 #if TBB_USE_PERFORMANCE_WARNINGS
01121         if( n == __TBB_empty_rehashed ) empty_buckets++;
01122         else if( n == __TBB_rehash_req ) buckets--;
01123         else if( n->next ) overpopulated_buckets++;
01124 #endif
01125         for(; is_valid(n); n = n->next ) {
01126             hashcode_t h = my_hash_compare.hash( static_cast<node*>(n)->item.first );
01127             h &= m;
01128             __TBB_ASSERT( h == b || get_bucket(h)->node_list == __TBB_rehash_req, "Rehashing is not finished until serial stage due to concurrent or terminated operation" );
01129         }
01130     }
01131 #if TBB_USE_PERFORMANCE_WARNINGS
01132     if( buckets > size) empty_buckets -= buckets - size;
01133     else overpopulated_buckets -= size - buckets; // TODO: load_factor?
01134     if( !reported && buckets >= 512 && ( 2*empty_buckets >= size || 2*overpopulated_buckets > size ) ) {
01135         internal::runtime_warning(
01136             "Performance is not optimal because the hash function produces bad randomness in lower bits in %s.\nSize: %d  Empties: %d  Overlaps: %d",
01137             typeid(*this).name(), size, empty_buckets, overpopulated_buckets );
01138         reported = true;
01139     }
01140 #endif
01141 #endif//TBB_USE_DEBUG || TBB_USE_PERFORMANCE_WARNINGS
01142     my_size = 0;
01143     segment_index_t s = segment_index_of( m );
01144     __TBB_ASSERT( s+1 == pointers_per_table || !my_table[s+1], "wrong mask or concurrent grow" );
01145     cache_aligned_allocator<bucket> alloc;
01146     do {
01147         __TBB_ASSERT( is_valid( my_table[s] ), "wrong mask or concurrent grow" );
01148         segment_ptr_t buckets = my_table[s];
01149         size_type sz = segment_size( s ? s : 1 );
01150         for( segment_index_t i = 0; i < sz; i++ )
01151             for( node_base *n = buckets[i].node_list; is_valid(n); n = buckets[i].node_list ) {
01152                 buckets[i].node_list = n->next;
01153                 delete_node( n );
01154             }
01155         if( s >= first_block) // the first segment or the next
01156             alloc.deallocate( buckets, sz );
01157         else if( s == embedded_block && embedded_block != first_block )
01158             alloc.deallocate( buckets, segment_size(first_block)-embedded_buckets );
01159         if( s >= embedded_block ) my_table[s] = 0;
01160     } while(s-- > 0);
01161     my_mask = embedded_buckets - 1;
01162 }
01163 
01164 template<typename Key, typename T, typename HashCompare, typename A>
01165 void concurrent_hash_map<Key,T,HashCompare,A>::internal_copy( const concurrent_hash_map& source ) {
01166     reserve( source.my_size ); // TODO: load_factor?
01167     if( my_mask == source.my_mask ) { // optimized version
01168         for( const_iterator it = source.begin(), end = source.end(); it != end; ++it ) {
01169             bucket *b = get_bucket( it.my_index );
01170             __TBB_ASSERT( b->node_list != __TBB_rehash_req, "Invalid bucket in destination table");
01171             node *n = new( my_allocator ) node(it->first, it->second);
01172             add_to_bucket( b, n );
01173             ++my_size; // TODO: replace by non-atomic op
01174         }
01175     } else internal_copy( source.begin(), source.end() );
01176 }
01177 
01178 template<typename Key, typename T, typename HashCompare, typename A>
01179 template<typename I>
01180 void concurrent_hash_map<Key,T,HashCompare,A>::internal_copy(I first, I last) {
01181     hashcode_t m = my_mask;
01182     for(; first != last; ++first) {
01183         hashcode_t h = my_hash_compare.hash( first->first );
01184         bucket *b = get_bucket( h & m );
01185         __TBB_ASSERT( b->node_list != __TBB_rehash_req, "Invalid bucket in destination table");
01186         node *n = new( my_allocator ) node(first->first, first->second);
01187         add_to_bucket( b, n );
01188         ++my_size; // TODO: replace by non-atomic op
01189     }
01190 }
01191 
01192 template<typename Key, typename T, typename HashCompare, typename A1, typename A2>
01193 inline bool operator==(const concurrent_hash_map<Key, T, HashCompare, A1> &a, const concurrent_hash_map<Key, T, HashCompare, A2> &b) {
01194     if(a.size() != b.size()) return false;
01195     typename concurrent_hash_map<Key, T, HashCompare, A1>::const_iterator i(a.begin()), i_end(a.end());
01196     typename concurrent_hash_map<Key, T, HashCompare, A2>::const_iterator j, j_end(b.end());
01197     for(; i != i_end; ++i) {
01198         j = b.equal_range(i->first).first;
01199         if( j == j_end || !(i->second == j->second) ) return false;
01200     }
01201     return true;
01202 }
01203 
01204 template<typename Key, typename T, typename HashCompare, typename A1, typename A2>
01205 inline bool operator!=(const concurrent_hash_map<Key, T, HashCompare, A1> &a, const concurrent_hash_map<Key, T, HashCompare, A2> &b)
01206 {    return !(a == b); }
01207 
01208 template<typename Key, typename T, typename HashCompare, typename A>
01209 inline void swap(concurrent_hash_map<Key, T, HashCompare, A> &a, concurrent_hash_map<Key, T, HashCompare, A> &b)
01210 {    a.swap( b ); }
01211 
01212 #if _MSC_VER && !defined(__INTEL_COMPILER)
01213     #pragma warning( pop )
01214 #endif // warning 4127 is back
01215 
01216 } // namespace tbb
01217 
01218 #endif /* __TBB_concurrent_hash_map_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.