Коннектор MoexFixFastCurrency включен в состав платформы OsEngine.
Классы коннектора расположены в папке MoexFixFastCurrency. Всё с открытым кодом. Вы можете посмотреть исходники на ГитХаб здесь.
Или внутри проекта здесь:
Код коннектора распределен в 11 регионах, согласно стандартам OsEngine:
При запуске подключения считываются введенные пользователем настройки:
В первом регионе происходит создание необходимых потоков, подключений к серверам, перевод коннектора в статус Connect.
Из-за специфики протокола FIX/FAST и особенностей его применения были созданы 7 параллельных потоков, которые нужны для решения коннектором двух главных задач:
1. Получать данные.
2. Совершать транзакции.
Данные по торгуемым инструментам, обезличенным сделкам, заявкам в стакане мы получаем через сокеты по протоколу UDP в формате FAST, подключаясь к платформе MOEX Market Data Multicast FIX/FAST. Для декодирования сообщений используется библиотека OpenFast и специальный xml файл – шаблон. В связи с тем, что протокол UDP не гарантирует доставку пакетов до клиента, данные во всех UDP-потоках распространяются в двух экземплярах (A и B) на двух разных multicast-адресах.
Кроме того, по обезличенным сделкам и ордерам потоки данных делятся ещё на инкрементальные данные (обновляемые в реальном времени сразу, как появились) и, так называемые, снэпшоты (массив исторических данных, собранный с начала текущего торгового дня), которые используются для восстановления пропущенных данных. Отдельно восстановить данные можно, запросив их по TCP соединению.
Для совершения сделок и получения информации о них используется сокет с протоколом TCP/IP, по которому идет обмен сообщениями в FIX формате.
Потоки в коннекторе распределены по типу поступающей информации:
Поток 1 – «GetterSecurity» получает информацию по доступным инструментам (Название, код, режим, стоимость шага цены, лотность и т.д.). В документации к платформе MOEX Market Data Multicast FIX/FAST относительно инструментов указано, что финансовый инструмент с кодом «SecCode» может быть доступен для торгов в разных режимах. Вы должны рассматривать комбинацию Symbol (55) + TradingsessionId (336) как отдельный инструмент с отдельными котировками и таблицами сделок и заявок. То есть, если на валютном рынке доступно 17 режимов, коды которых передаются в тэге TradingsessionId, у каждого режима может быть инструмент с одинаковым названием, например, CNYRUB_TOM. Этот нюанс пришлось учесть в регионе 7 Security subscrible, создавая уникальное имя инструмента, чтобы не получить ошибку, торгуя в разных режимах:
Также в этом регионе в целях снижения нагрузки на сетевую инфраструктуру и экономии ресурсов учитывается время подписки на инструмент. Если подписываемся на инструмент до начала торгов, то подключаемся только к сокетам для получения инкрементальных данных по трейдам и ордерам. Не создаются сокеты для восстановления данных, объекты нескольких классов и некоторые списки:
Поток 2 – «GetterTrades» получает FAST сообщения, содержащие инкрементальные данные по трейдам и, если необходимо, снэпшоты трейдов, проверяет пропуски номеров и отправляет в очередь для разбора только сообщения, касающиеся инструментов, выбранных для торговли.
Поток 3 – «TradesReaderFromQueue» разбирает очередь с сообщениями. Если пришли инкрементальные данные, проверяет пропуски в номерах по конкретному инструменту, инициализирует восстановление по ТСР, извлекает из сообщения данные по сделке и передает их в систему в виде экземпляра класса Trade. Если из очереди пришел фрагмент снэпшота (чем больше времени прошло с начала торгов, тем больше фрагментов содержит снэпшот) создается экземпляр класса SnapshotFragment, в котором хранится информация о нем и в том числе список трейдов. Когда снэпшот сформирован из достаточного количества фрагментов, все трейды поступают в систему, восстановление завершается и Поток 2 перестает слушать сокеты снэпшотов.
Поток 4 – «GetterOrders» получает FAST сообщения, содержащие инкрементальные данные по ордерам. Функционал аналогичен Потоку 2.
Поток 5 – «OrdersReaderFromQueue» разбирает очередь с сообщениями, необходимыми для формирования стакана: инкрементальные данные и снэпшоты. Но, чтобы получить актуальный стакан, потребуются все сообщения по инструменту с начала торгового дня, поскольку объем по определенной цене формируется из заявок, которые могут добавляться, изменяться и удаляться, и всех их надо хранить.
Изменения заявок хранятся в словаре в виде экземпляров класса OrderChange.
Ключевое свойство класса - MDEntryID – идентификатор заявки. По нему мы находим заявки и, в зависимости от предписанного действия в свойстве Action, добавляем, изменяем или удаляем заявку.
Но, если мы подключились к серверу после начала торгов, первое отображение стакана в терминале происходит после обработки заявок, содержащихся в снэпшоте. Эти заявки не содержат действий, а только идентификатор, цену, тип (bid или ask) и объем. При обработке фрагментов снэпшота заявки заранее сохраняются в список в виде экземпляров класса MarketDepthLevel и после формирования снэпшота в методе MakeFirstDepth обрабатываются путём сложения всех объемов, принадлежащих одной цене и типу:
Далее стакан обновляется из инкрементальных данных.
Поток 6 – «HistoricalReplayMoexFixFastCurrency» нужен для восстановления пропущенных инкрементальных данных по трейдам или ордерам по протоколу ТСР.
Подключение к сокету происходит только тогда, когда из потоков обработки очередей трейдов или ордеров придет подтверждение пропуска данных. Чтобы провести эту проверку, сначала в словарь сохраняем экземпляр класса NumbersData, у которого только два свойства MsgSeqNum – это порядковый номер FAST сообщения и RptSeq и порядковый номер обновления по конкретному инструменту. Чтобы не восстанавливать данные по ненужным инструментам, проверяются пропуски только в RptSeq и, если пропущено 5 и более сообщений, потоку восстановления подается диапазон соответствующих им номеров MsgSeqNum. За это отвечает метод IsDataMissed.
После подключения к сокету отправляем серверу FIX сообщение с указанием типа запрашиваемых данных, номера первого и последнего сообщения из нужного диапазона.
Сервер присылает массив FAST сообщений, из которого мы берем только пропущенные по подписанному инструменту и кладем их в соответствующую очередь для дальнейшей обработки в Потоке 3 или Потоке 5.
Поток 7 – «MFIXTradeServerProcessing» нужен для обработки FIX сообщений, связанных с торговыми операциями, а также контроля связи с сервером MFIX Transactional.
Для установления связи с сервером используются классы TcpClient и NetworkStream стандартной библиотеки System.Net
За формирование сообщений отвечает класс MessageConstructor, в методах которого с помощью класса StringBuilder соединяются пары «тэг=значение», и возвращаются строки для отправки на сервер.
В бесконечном цикле Потока 7 свойство объекта _fxMFIXTradeStream - DataAvailable возвращает значение, указывающее, имеются ли в объекте класса NetworkStream данные, доступные для чтения. Если имеются, вызывается главный метод парсинга FIX сообщений FixMessageReader. В нем сообщение разбивается на пары «тэг=значение» и помещается в словарь Dictionary для быстрого доступа к значению по номеру тэга. В зависимости от типа пришедшего сообщения происходит дальнейшая обработка.
Контроль соединения с сервером осуществляется с помощью таймера, который запускается после последнего FIX сообщения. Если сервер не прислал Hearbeat в течение определенного времени, отправляется сообщение Test Request и, если оно осталось без ответа, подключение разрывается. Со стороны сервера проводится такая же проверка связи с коннектором.
Итак, я обозначил основные моменты работы коннектора с учетом документации к платформе MOEX Market Data Multicast FIX/FAST и сервису MFIX Transactional. Пройдены все необходимые автотесты платформы OsEngine, а также сертификация в Московской бирже.
Чтобы ознакомиться с кодом коннектора подробнее, можно скачать OsEngine с публичного репозитория в GitHub по адресу: https://github.com/AlexWan/OsEngine и открыть его в программе Visual Studio.
Поддержка OsEngine: https://t.me/osengine_official_support
Комментарии