Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cpp/csp/adapters/websocket/ClientAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,15 @@ void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
if( m_inputAdapter ) {
m_endpoint -> setOnMessage(
[ this ]( void* c, size_t t ) {
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( c, t, &batch );
try
{
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( c, t, &batch );
}
catch( csp::Exception & err )
{
pushStatus( StatusLevel::ERROR, ClientStatusType::GENERIC_ERROR, err.what() );
}
}
);
} else {
Expand Down
6 changes: 6 additions & 0 deletions cpp/csp/cppnodes/baselibimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,9 @@ DECLARE_CPPNODE( struct_fromts )
);
}

if( unlikely( !out.get() -> validate() ) )
CSP_THROW( ValueError, "Struct " << cls.value() -> name() << " is not valid; required fields " << out -> formatAllUnsetStrictFields() << " did not tick" );

CSP_OUTPUT( std::move( out ) );
}

Expand Down Expand Up @@ -758,6 +761,9 @@ DECLARE_CPPNODE( struct_collectts )
}
);
}

if( unlikely( !out.get() -> validate( ) ) )
CSP_THROW( ValueError, "Struct " << cls.value() -> name() << " is not valid; some required fields did not tick" );

CSP_OUTPUT( std::move( out ) );
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/csp/engine/InputAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <csp/core/Time.h>
#include <csp/engine/Enums.h>
#include <csp/engine/RootEngine.h>
#include <csp/engine/Struct.h>
#include <csp/engine/TimeSeriesProvider.h>

namespace csp
Expand Down Expand Up @@ -55,6 +56,12 @@ class InputAdapter : public TimeSeriesProvider, public EngineOwned
template<typename T>
bool InputAdapter::consumeTick( const T & value )
{
if constexpr( CspType::Type::fromCType<T>::type == CspType::TypeTraits::STRUCT )
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a central place we can do this check on push rather than consume? Will be much harder to track down on consumption

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not see such a place - we could add it to each adapter type (push, pull, pushpull and managers) separately in their pushTick impls, but I think it's cleaner to have the validation logic in one spot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but my point stands that it would be a lot harder to tell where the error is coming from
Also unfortunate to vall validate on all input adapters, even python based ones which were already validated due to PyStruct creation in python

{
if( unlikely( !( value -> validate() ) ) )
CSP_THROW( ValueError, "Struct " << value -> meta() -> name() << " is not valid; required fields " << value -> formatAllUnsetStrictFields() << " were not set on init" );
}

switch( pushMode() )
{
case PushMode::LAST_VALUE:
Expand Down
160 changes: 141 additions & 19 deletions cpp/csp/engine/Struct.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
#include <csp/core/System.h>
#include <csp/engine/Struct.h>
#include <algorithm>
#include <ranges>
#include <string>
#include <sstream>

namespace csp
{

StructField::StructField( CspTypePtr type, const std::string & fieldname,
size_t size, size_t alignment ) :
size_t size, size_t alignment, bool isOptional ) :
m_fieldname( fieldname ),
m_offset( 0 ),
m_size( size ),
m_alignment( alignment ),
m_maskOffset( 0 ),
m_noneMaskOffset( 0 ),
m_maskBit( 0 ),
m_noneMaskBit( 0 ),
m_maskBitMask( 0 ),
m_type( type )
m_noneMaskBitMask( 0 ),
m_type( type ),
m_isOptional( isOptional )
{
}

Expand All @@ -33,11 +40,12 @@ and adjustments required for the hidden fields

*/

StructMeta::StructMeta( const std::string & name, const Fields & fields,
std::shared_ptr<StructMeta> base ) : m_name( name ), m_base( base ), m_fields( fields ),
StructMeta::StructMeta( const std::string & name, const Fields & fields, bool isStrict,
std::shared_ptr<StructMeta> base ) : m_name( name ), m_base( base ), m_fields( fields ),
m_size( 0 ), m_partialSize( 0 ), m_partialStart( 0 ), m_nativeStart( 0 ), m_basePadding( 0 ),
m_maskLoc( 0 ), m_maskSize( 0 ), m_firstPartialField( 0 ), m_firstNativePartialField( 0 ),
m_isPartialNative( true ), m_isFullyNative( true )
m_optionalFieldsSetBits( nullptr ), m_optionalFieldsNoneBits( nullptr ), m_isPartialNative( true ),
m_isFullyNative( true ), m_isStrict( isStrict )
{
if( m_fields.empty() && !m_base)
CSP_THROW( TypeError, "Struct types must define at least 1 field" );
Expand Down Expand Up @@ -95,20 +103,70 @@ StructMeta::StructMeta( const std::string & name, const Fields & fields,
//Setup masking bits for our fields
//NOTE we can be more efficient by sticking masks into any potential alignment gaps, dont want to spend time on it
//at this point
m_maskSize = !m_fields.empty() ? 1 + ( ( m_fields.size() - 1 ) / 8 ) : 0;

size_t optionalFieldCount = std::count_if( m_fields.begin(), m_fields.end(), []( const auto & f ) { return f -> isOptional(); } );

m_maskSize = !m_fields.empty() ? 1 + ( ( m_fields.size() + optionalFieldCount - 1 ) / 8 ) : 0;
m_size = offset + m_maskSize;
m_partialSize = m_size - baseSize;
m_maskLoc = m_size - m_maskSize;

uint8_t numRemainingBits = ( m_fields.size() - m_firstPartialField + optionalFieldCount ) % 8;
m_lastByteMask = ( 1u << numRemainingBits ) - 1;

size_t maskLoc = m_maskLoc;
uint8_t maskBit = 0;
for( auto & f : m_fields )

// Set optional fields first so that their 2-bits never cross a byte boundary
m_optionalFieldsSetBits = new uint8_t[ m_maskSize ]();
m_optionalFieldsNoneBits = new uint8_t[ m_maskSize ]();
for( size_t i = 0; i < m_fields.size(); ++i )
{
auto & f = m_fields[ i ];
if( f -> isOptional() )
{
f -> setMaskOffset( maskLoc, maskBit );
f -> setNoneMaskOffset( maskLoc, ++maskBit ); // use adjacent bit for None bit
if( ++maskBit == 8 )
{
m_optionalFieldsSetBits[ maskLoc - m_maskLoc ] = 0xAA;
m_optionalFieldsNoneBits[ maskLoc - m_maskLoc ] = 0x55;
maskBit = 0;
++maskLoc;
}
}
}
// deal with last (partial) byte filled with optional fields
auto lastOptIndex = maskLoc - m_maskLoc;
switch( maskBit / 2 )
{
f -> setMaskOffset( maskLoc, maskBit );
if( ++maskBit == 8 )
case 1:
m_optionalFieldsSetBits[ lastOptIndex ] = 0x1;
m_optionalFieldsNoneBits[ lastOptIndex ] = 0x2;
break;
case 2:
m_optionalFieldsSetBits[ lastOptIndex ] = 0x5;
m_optionalFieldsNoneBits[ lastOptIndex ] = 0xA;
break;
case 3:
m_optionalFieldsSetBits[ lastOptIndex ] = 0x15;
m_optionalFieldsNoneBits[ lastOptIndex ] = 0x2A;
break;
default:
break; // default initialized to 0
}

for( size_t i = 0; i < m_fields.size(); ++i )
{
auto & f = m_fields[ i ];
if( !f -> isOptional() )
{
maskBit = 0;
++maskLoc;
f -> setMaskOffset( maskLoc, maskBit );
if( ++maskBit == 8 )
{
maskBit = 0;
++maskLoc;
}
}
}

Expand All @@ -128,11 +186,22 @@ StructMeta::StructMeta( const std::string & name, const Fields & fields,
if( !rv.second )
CSP_THROW( ValueError, "csp Struct " << name << " attempted to add existing field " << m_fields[ idx ] -> fieldname() );
}

// The complete inheritance hierarchy must agree on strict/non-strict
if( m_base && m_isStrict != m_base -> isStrict() )
{
CSP_THROW( ValueError,
"Struct " << m_name << " was declared " << ( m_isStrict ? "strict" : "non-strict" ) << " but derives from "
<< m_base -> name() << " which is " << ( m_base -> isStrict() ? "strict" : "non-strict" )
);
}
}

StructMeta::~StructMeta()
{
m_default.reset();
delete[] m_optionalFieldsSetBits;
delete[] m_optionalFieldsNoneBits;
}

Struct * StructMeta::createRaw() const
Expand Down Expand Up @@ -453,23 +522,62 @@ void StructMeta::clear( Struct * s ) const
m_base -> clear( s );
}

std::string StructMeta::formatAllUnsetStrictFields( const Struct * s ) const
{
bool first = true;
std::stringstream ss;
ss << "[";

for( size_t i = 0; i < m_fields.size(); ++i )
{
const auto & f = m_fields[ i ];
if( !f -> isSet( s ) && !f -> isNone( s ) )
{
if( !first )
ss << ", ";
else
first = false;
ss << f -> fieldname();
}
}
ss << "]";
return ss.str();
}

bool StructMeta::allFieldsSet( const Struct * s ) const
{
size_t numLocalFields = m_fields.size() - m_firstPartialField;
uint8_t numRemainingBits = numLocalFields % 8;
/*
We can use bit operations to validate the struct.
1. Let M1 be the bitmask with 1s that align with the set bit of optional fields and
2. Let M2 be the bitmask with 1s that align with the none bit of optional fields.
-- Both M1 and M2 are computed trivially when we create the meta.
3. Let M1* = M1 & mask. M1* now is the bitmask of all set optional fields.
4. Similarly, let M2* = M2 & mask, such that M2* is the bitmask of all none optional fields.
5. Let M3 = mask | (M1* << 1) | (M2* >> 1). Since the shifted set/none bitsets will fill in that optional fields other bit,
the struct can validate iff M3 is all 1s.

Notes:
1) We do this on a byte by byte basis currently. If we add 32/64 bit padding to the struct mask, we could do this as one single step for most structs.
2) There is an edge condition if a) the set bit of an optional field is the last bit of a byte or b) the none bit of an optional field is the first bit of a byte.
So, when we create the meta, we ensure this never happens by being smart about the ordering of fields in the mask.
*/

const uint8_t * m = reinterpret_cast<const uint8_t *>( s ) + m_maskLoc;
const uint8_t * e = m + m_maskSize - bool( numRemainingBits );
for( ; m < e; ++m )
const uint8_t * e = m + m_maskSize - bool( m_lastByteMask );

const uint8_t * M1 = m_optionalFieldsSetBits;
const uint8_t * M2 = m_optionalFieldsNoneBits;

for( ; m < e; ++m, ++M1, ++M2 )
{
if( *m != 0xFF )
if( ( *m | ( ( *m & *M1 ) << 1 ) | ( ( *m & *M2 ) >> 1 ) ) != 0xFF )
return false;
}

if( numRemainingBits > 0 )
if( m_lastByteMask )
{
uint8_t bitmask = ( 1u << numRemainingBits ) - 1;
if( ( *m & bitmask ) != bitmask )
uint8_t masked = *m & m_lastByteMask;
if( ( masked | ( ( ( masked & *M1 ) << 1 ) & m_lastByteMask ) | ( ( masked & *M2 ) >> 1 ) ) != m_lastByteMask )
return false;
}

Expand All @@ -494,6 +602,20 @@ void StructMeta::destroy( Struct * s ) const
m_base -> destroy( s );
}

[[nodiscard]] bool StructMeta::validate( const Struct * s ) const
{
if( !s -> meta() -> isStrict() )
return true;

for ( const StructMeta * cur = this; cur; cur = cur -> m_base.get() )
{
if ( !cur -> allFieldsSet( s ) )
return false;
}

return true;
}

Struct::Struct( const std::shared_ptr<const StructMeta> & meta )
{
//Initialize meta shared_ptr
Expand Down
Loading
Loading