mongo-debian/db/record.cpp
2011-12-15 09:35:47 +01:00

231 lines
7 KiB
C++

// record.cpp
#include "pch.h"
#include "pdfile.h"
#include "../util/processinfo.h"
#include "../util/net/listen.h"
namespace mongo {
namespace ps {
enum State {
In , Out, Unk
};
enum Constants {
SliceSize = 65536 ,
MaxChain = 20 , // intentionally very low
NumSlices = 10 ,
RotateTimeSecs = 90
};
int hash( size_t region ) {
return
abs( ( ( 7 + (int)(region & 0xFFFF) )
* ( 11 + (int)( ( region >> 16 ) & 0xFFFF ) )
#if defined(_WIN64) || defined(__amd64__)
* ( 13 + (int)( ( region >> 32 ) & 0xFFFF ) )
* ( 17 + (int)( ( region >> 48 ) & 0xFFFF ) )
#endif
) % SliceSize );
}
/**
* simple hash map for region -> status
* this constitures a single region of time
* it does chaining, but very short chains
*/
class Slice {
struct Entry {
size_t region;
unsigned long long value;
};
public:
Slice() {
reset();
}
void reset() {
memset( _data , 0 , SliceSize * sizeof(Entry) );
}
State get( int regionHash , size_t region , short offset ) {
DEV assert( hash( region ) == regionHash );
Entry * e = _get( regionHash , region , false );
if ( ! e )
return Unk;
return ( e->value & ( ((unsigned long long)1) << offset ) ) ? In : Out;
}
/**
* @return true if added, false if full
*/
bool in( int regionHash , size_t region , short offset ) {
DEV assert( hash( region ) == regionHash );
Entry * e = _get( regionHash , region , true );
if ( ! e )
return false;
e->value |= ((unsigned long long)1) << offset;
return true;
}
private:
Entry* _get( int start , size_t region , bool add ) {
for ( int i=0; i<MaxChain; i++ ) {
int bucket = ( start + i ) % SliceSize;
if ( _data[bucket].region == 0 ) {
if ( ! add )
return 0;
_data[bucket].region = region;
return &_data[bucket];
}
if ( _data[bucket].region == region ) {
return &_data[bucket];
}
}
return 0;
}
Entry _data[SliceSize];
};
/**
* this contains many slices of times
* the idea you put mem status in the current time slice
* and then after a certain period of time, it rolls off so we check again
*/
class Rolling {
public:
Rolling()
: _lock( "ps::Rolling" ){
_curSlice = 0;
_lastRotate = Listener::getElapsedTimeMillis();
}
/**
* after this call, we assume the page is in ram
* @param doHalf if this is a known good access, want to put in first half
* @return whether we know the page is in ram
*/
bool access( size_t region , short offset , bool doHalf ) {
int regionHash = hash(region);
SimpleMutex::scoped_lock lk( _lock );
static int rarely_count = 0;
if ( rarely_count++ % 2048 == 0 ) {
long long now = Listener::getElapsedTimeMillis();
RARELY if ( now == 0 ) {
tlog() << "warning Listener::getElapsedTimeMillis returning 0ms" << endl;
}
if ( now - _lastRotate > ( 1000 * RotateTimeSecs ) ) {
_rotate();
}
}
for ( int i=0; i<NumSlices / ( doHalf ? 2 : 1 ); i++ ) {
int pos = (_curSlice+i)%NumSlices;
State s = _slices[pos].get( regionHash , region , offset );
if ( s == In )
return true;
if ( s == Out ) {
_slices[pos].in( regionHash , region , offset );
return false;
}
}
// we weren't in any slice
// so add to cur
if ( ! _slices[_curSlice].in( regionHash , region , offset ) ) {
_rotate();
_slices[_curSlice].in( regionHash , region , offset );
}
return false;
}
private:
void _rotate() {
_curSlice = ( _curSlice + 1 ) % NumSlices;
_slices[_curSlice].reset();
_lastRotate = Listener::getElapsedTimeMillis();
}
int _curSlice;
long long _lastRotate;
Slice _slices[NumSlices];
SimpleMutex _lock;
} rolling;
}
bool Record::MemoryTrackingEnabled = true;
volatile int __record_touch_dummy = 1; // this is used to make sure the compiler doesn't get too smart on us
void Record::touch( bool entireRecrd ) {
if ( lengthWithHeaders > HeaderSize ) { // this also makes sure lengthWithHeaders is in memory
char * addr = data;
char * end = data + netLength();
for ( ; addr <= end ; addr += 2048 ) {
__record_touch_dummy += addr[0];
break; // TODO: remove this, pending SERVER-3711
if ( ! entireRecrd )
break;
}
}
}
bool Record::likelyInPhysicalMemory() {
if ( ! MemoryTrackingEnabled )
return true;
static bool blockSupported = ProcessInfo::blockCheckSupported();
const size_t page = (size_t)data >> 12;
const size_t region = page >> 6;
const size_t offset = page & 0x3f;
if ( ps::rolling.access( region , offset , false ) )
return true;
if ( ! blockSupported )
return false;
return ProcessInfo::blockInMemory( data );
}
Record* Record::accessed() {
const size_t page = (size_t)data >> 12;
const size_t region = page >> 6;
const size_t offset = page & 0x3f;
ps::rolling.access( region , offset , true );
return this;
}
}