tin  1.5.9
transport.h
Go to the documentation of this file.
1 // Copyright (c) 2026 Tinverse LLC. All rights reserved.
2 // SPDX-License-Identifier: LicenseRef-Tinverse-Commercial
3 
11 
12 #pragma once
13 
14 #include <array>
15 #include <cstddef>
16 #include <cstdint>
17 #include <utility>
18 
19 #include "tsm/runtime/transport.h"
20 #include "tsm/ticks.h"
21 
22 namespace tsm::transport {
23 
24 namespace detail {
25 
26 template<typename Serializer>
28 {
29  Serializer::schema_version;
30 };
31 
32 template<typename Serializer>
34 {
35  Serializer::event_id;
36 };
37 
38 } // namespace detail
39 
46 enum class status : std::uint8_t
47 {
48  accepted,
49  empty,
50  full,
51  malformed,
52  rejected,
55  stale_slot,
57 };
58 
65 template<std::size_t MaxBytes>
67 {
68  std::array<std::byte, MaxBytes> bytes{};
69  std::size_t size{};
70 
71  [[nodiscard]] bool assign(std::byte const* data, std::size_t length)
72  {
73  if (length > MaxBytes) {
74  return false;
75  }
76  for (std::size_t i = 0; i < length; ++i) {
77  bytes[i] = data[i];
78  }
79  size = length;
80  return true;
81  }
82 
83  void clear()
84  {
85  size = 0U;
86  }
87 };
88 
95 {
96  std::uint32_t schema_version{ 1U };
97  std::uint16_t event_id{};
98  std::uint16_t flags{};
99 };
100 
106 template<std::size_t MaxBytes>
108 {
110  std::uint32_t sequence{};
112 };
113 
135 template<typename Serializer>
137 {
138  template<std::size_t MaxBytes>
139  [[nodiscard]] static status encode(
140  typename Serializer::event_type const& event,
142  {
143  return Serializer::encode(event, record) ? status::accepted
145  }
146 
147  template<std::size_t MaxBytes>
148  [[nodiscard]] static status decode(serialized_event<MaxBytes> const& record,
149  typename Serializer::event_type& event)
150  {
151  if constexpr (detail::serializer_has_schema_version<Serializer>) {
152  if (record.header.schema_version != Serializer::schema_version) {
154  }
155  }
156  if constexpr (detail::serializer_has_event_id<Serializer>) {
157  if (record.header.event_id != Serializer::event_id) {
158  return status::malformed;
159  }
160  }
161  return Serializer::decode(record, event)
164  }
165 };
166 
173 template<std::size_t MaxBytes>
174 struct m2m_frame
175 {
176  std::uint32_t source_id{};
177  std::uint32_t destination_id{};
178  std::uint32_t sequence{};
179  std::uint16_t kind{};
180  bool crc_ok{ true };
182 };
183 
190 template<std::size_t MaxBytes>
192 {
193  std::uint32_t topic_id{};
194  std::uint32_t writer_id{};
195  std::uint32_t sequence{};
196  bool valid{ true };
198 };
199 
205 template<std::size_t MaxBytes>
207 {
208  std::uint32_t client_id{};
209  std::uint32_t correlation_id{};
210  std::uint16_t method{};
212 };
213 
218 struct rpc_reply
219 {
220  std::uint32_t correlation_id{};
223 };
224 
231 template<std::size_t MaxBytes>
233 {
235  std::uint16_t event_id{};
237 };
238 
247 template<std::size_t MaxBytes>
249 {
250  std::uint32_t channel_id{};
251  std::uint32_t source_id{};
252  std::uint32_t sequence{};
253  std::uint16_t event_id{};
254  bool valid{ true };
256 };
257 
274 template<typename Codec>
276 {
277  template<typename Runtime, std::size_t MaxBytes>
278  [[nodiscard]] static status receive(Runtime& runtime,
279  m2m_frame<MaxBytes> const& frame)
280  {
281  if (!frame.crc_ok) {
282  return status::malformed;
283  }
284 
285  typename Codec::event_type event{};
286  if (!Codec::decode(frame, event)) {
287  return status::malformed;
288  }
290  std::move(event))
293  }
294 };
295 
302 template<typename Codec>
304 {
305  template<typename Runtime, std::size_t MaxBytes>
306  [[nodiscard]] static status receive(Runtime& runtime,
307  dds_sample<MaxBytes> const& sample)
308  {
309  if (!sample.valid) {
310  return status::malformed;
311  }
312 
313  typename Codec::event_type event{};
314  if (!Codec::decode(sample, event)) {
315  return status::malformed;
316  }
318  std::move(event))
321  }
322 };
323 
329 template<typename Codec>
331 {
332  template<typename Runtime, std::size_t MaxBytes>
333  [[nodiscard]] static rpc_reply request(Runtime& runtime,
335  {
336  typename Codec::event_type event{};
337  if (!Codec::decode(request, event)) {
338  return { request.correlation_id, status::malformed, false };
339  }
340 
342  runtime, std::move(event));
343  return { request.correlation_id,
345  handled };
346  }
347 };
348 
354 template<typename Codec>
356 {
357  template<typename Runtime, std::size_t MaxBytes>
358  [[nodiscard]] static status apply(Runtime& runtime,
359  replay_record<MaxBytes> const& record)
360  {
361  typename Codec::event_type event{};
362  if (!Codec::decode(record, event)) {
363  return status::malformed;
364  }
366  std::move(event))
369  }
370 };
371 
380 template<typename Codec>
382 {
383  template<typename Runtime, std::size_t MaxBytes>
384  [[nodiscard]] static status receive(
385  Runtime& runtime,
386  event_io_message<MaxBytes> const& message)
387  {
388  if (!message.valid) {
389  return status::malformed;
390  }
391 
392  typename Codec::event_type event{};
393  if (!Codec::decode(message, event)) {
394  return status::malformed;
395  }
397  std::move(event))
400  }
401 };
402 
411 template<typename Slot, std::size_t Capacity>
413 {
414  static_assert(Capacity > 1U,
415  "tsm: shared_memory_channel requires at least two slots");
416 
417  public:
419  {
420  Slot* slot{};
421  std::uint32_t sequence{};
422  };
423 
425  {
426  Slot const* slot{};
427  std::uint32_t sequence{};
428  };
429 
431  {
432  if (full()) {
433  return status::full;
434  }
435  auto& slot = slots_[write_index_];
436  out = { &slot.value, slot.sequence };
437  return status::accepted;
438  }
439 
440  [[nodiscard]] bool acquire_write(writable_slot& out)
441  {
442  return try_acquire_write(out) == status::accepted;
443  }
444 
445  [[nodiscard]] status try_commit_write(writable_slot const& written)
446  {
447  auto& slot = slots_[write_index_];
448  if (written.slot != &slot.value || slot.ready ||
449  written.sequence != slot.sequence) {
450  return status::stale_slot;
451  }
452  slot.sequence = written.sequence + 1U;
453  slot.ready = true;
454  write_index_ = next(write_index_);
455  ++count_;
456  return status::accepted;
457  }
458 
459  [[nodiscard]] bool commit_write(writable_slot const& written)
460  {
461  return try_commit_write(written) == status::accepted;
462  }
463 
464  [[nodiscard]] status try_acquire_read(readable_slot& out) const
465  {
466  if (empty()) {
467  return status::empty;
468  }
469  auto const& slot = slots_[read_index_];
470  out = { &slot.value, slot.sequence };
471  return status::accepted;
472  }
473 
474  [[nodiscard]] bool acquire_read(readable_slot& out) const
475  {
476  return try_acquire_read(out) == status::accepted;
477  }
478 
479  [[nodiscard]] status try_release_read(readable_slot const& read)
480  {
481  auto& slot = slots_[read_index_];
482  if (read.slot != &slot.value || !slot.ready ||
483  read.sequence != slot.sequence) {
484  return status::stale_slot;
485  }
486  slot.ready = false;
487  read_index_ = next(read_index_);
488  --count_;
489  return status::accepted;
490  }
491 
492  [[nodiscard]] bool release_read(readable_slot const& read)
493  {
494  return try_release_read(read) == status::accepted;
495  }
496 
497  [[nodiscard]] bool empty() const
498  {
499  return count_ == 0U;
500  }
501  [[nodiscard]] bool full() const
502  {
503  return count_ == Capacity;
504  }
505  [[nodiscard]] std::size_t size() const
506  {
507  return count_;
508  }
509 
510  private:
511  struct storage_slot
512  {
513  Slot value{};
514  std::uint32_t sequence{};
515  bool ready{};
516  };
517 
518  static constexpr std::size_t next(std::size_t index)
519  {
520  return (index + 1U) % Capacity;
521  }
522 
523  std::array<storage_slot, Capacity> slots_{};
524  std::size_t write_index_{};
525  std::size_t read_index_{};
526  std::size_t count_{};
527 };
528 
536 template<typename Serializer>
538 {
539  template<typename Runtime, std::size_t MaxBytes, std::size_t Capacity>
540  [[nodiscard]] static status receive_one(
541  Runtime& runtime,
543  {
545  Capacity>::readable_slot readable{};
546  const status acquired = channel.try_acquire_read(readable);
547  if (acquired != status::accepted) {
548  return acquired;
549  }
550 
551  typename Serializer::event_type event{};
552  const status decoded =
553  serializer_adapter<Serializer>::decode(*readable.slot, event);
554  if (decoded != status::accepted) {
555  static_cast<void>(channel.release_read(readable));
556  return decoded;
557  }
558 
560  runtime, std::move(event));
561  static_cast<void>(channel.release_read(readable));
562  return handled ? status::accepted : status::rejected;
563  }
564 };
565 
566 } // namespace tsm::transport
Definition: sync.h:507
Definition: transport.h:413
status try_commit_write(writable_slot const &written)
Definition: transport.h:445
std::size_t size() const
Definition: transport.h:505
bool commit_write(writable_slot const &written)
Definition: transport.h:459
status try_acquire_write(writable_slot &out)
Definition: transport.h:430
status try_release_read(readable_slot const &read)
Definition: transport.h:479
bool full() const
Definition: transport.h:501
bool release_read(readable_slot const &read)
Definition: transport.h:492
bool acquire_write(writable_slot &out)
Definition: transport.h:440
bool acquire_read(readable_slot &out) const
Definition: transport.h:474
bool empty() const
Definition: transport.h:497
status try_acquire_read(readable_slot &out) const
Definition: transport.h:464
requires(!has_transition_type_c< T > &&has_transition_member_c< T >) struct transitions_of< T >
Definition: transition.h:479
concept serializer_has_schema_version
Definition: transport.h:27
concept serializer_has_event_id
Definition: transport.h:33
Definition: transport.h:22
status
Definition: transport.h:47
runtime::Runtime< Definition, Policy, MachinePolicy > Runtime
Definition: runtime.h:35
std::chrono::duration< tick_rep, tick_period > tick_duration
Chrono duration type used for semantic scheduler ticks.
Definition: ticks.h:50
static bool send(Runtime &runtime, Event &&event)
Definition: transport.h:32
Definition: transport.h:304
static status receive(Runtime &runtime, dds_sample< MaxBytes > const &sample)
Definition: transport.h:306
Definition: transport.h:192
bool valid
Definition: transport.h:196
std::uint32_t sequence
Definition: transport.h:195
std::uint32_t writer_id
Definition: transport.h:194
payload_buffer< MaxBytes > payload
Definition: transport.h:197
std::uint32_t topic_id
Definition: transport.h:193
Definition: transport.h:382
static status receive(Runtime &runtime, event_io_message< MaxBytes > const &message)
Definition: transport.h:384
Definition: transport.h:249
bool valid
Definition: transport.h:254
std::uint32_t sequence
Definition: transport.h:252
std::uint32_t channel_id
Definition: transport.h:250
std::uint32_t source_id
Definition: transport.h:251
payload_buffer< MaxBytes > payload
Definition: transport.h:255
std::uint16_t event_id
Definition: transport.h:253
Definition: transport.h:276
static status receive(Runtime &runtime, m2m_frame< MaxBytes > const &frame)
Definition: transport.h:278
Definition: transport.h:175
std::uint32_t destination_id
Definition: transport.h:177
std::uint16_t kind
Definition: transport.h:179
std::uint32_t sequence
Definition: transport.h:178
std::uint32_t source_id
Definition: transport.h:176
bool crc_ok
Definition: transport.h:180
payload_buffer< MaxBytes > payload
Definition: transport.h:181
Definition: transport.h:67
std::size_t size
Definition: transport.h:69
std::array< std::byte, MaxBytes > bytes
Definition: transport.h:68
void clear()
Definition: transport.h:83
bool assign(std::byte const *data, std::size_t length)
Definition: transport.h:71
Definition: transport.h:356
static status apply(Runtime &runtime, replay_record< MaxBytes > const &record)
Definition: transport.h:358
Definition: transport.h:233
payload_buffer< MaxBytes > payload
Definition: transport.h:236
tsm::tick_duration tick
Definition: transport.h:234
std::uint16_t event_id
Definition: transport.h:235
Definition: transport.h:331
static rpc_reply request(Runtime &runtime, rpc_request< MaxBytes > const &request)
Definition: transport.h:333
Definition: transport.h:219
bool event_handled
Definition: transport.h:222
status result
Definition: transport.h:221
std::uint32_t correlation_id
Definition: transport.h:220
Definition: transport.h:207
std::uint16_t method
Definition: transport.h:210
payload_buffer< MaxBytes > payload
Definition: transport.h:211
std::uint32_t client_id
Definition: transport.h:208
std::uint32_t correlation_id
Definition: transport.h:209
Definition: transport.h:108
std::uint32_t sequence
Definition: transport.h:110
serializer_header header
Definition: transport.h:109
payload_buffer< MaxBytes > payload
Definition: transport.h:111
Definition: transport.h:137
static status encode(typename Serializer::event_type const &event, serialized_event< MaxBytes > &record)
Definition: transport.h:139
static status decode(serialized_event< MaxBytes > const &record, typename Serializer::event_type &event)
Definition: transport.h:148
Definition: transport.h:95
std::uint16_t event_id
Definition: transport.h:97
std::uint32_t schema_version
Definition: transport.h:96
std::uint16_t flags
Definition: transport.h:98
Definition: transport.h:538
static status receive_one(Runtime &runtime, shared_memory_channel< serialized_event< MaxBytes >, Capacity > &channel)
Definition: transport.h:540
std::uint32_t sequence
Definition: transport.h:427
Slot const * slot
Definition: transport.h:426
std::uint32_t sequence
Definition: transport.h:421
Target-neutral tick value type.
Local runtime transport boundary for typed events.