Program Listing for File write_to_file.hpp#

Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/stages/write_to_file.hpp)

/*
 * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include "morpheus/export.h"
#include "morpheus/messages/meta.hpp"
#include "morpheus/objects/file_types.hpp"

#include <boost/fiber/context.hpp>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pymrc/node.hpp>
#include <rxcpp/rx.hpp>

#include <fstream>
#include <functional>  // for function
#include <memory>
#include <string>
#include <thread>

namespace morpheus {
/****** Component public implementations *******************/
/****** WriteToFileStage********************************/

class MORPHEUS_EXPORT WriteToFileStage
  : public mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MessageMeta>>
{
  public:
    using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageMeta>, std::shared_ptr<MessageMeta>>;
    using typename base_t::sink_type_t;
    using typename base_t::source_type_t;
    using typename base_t::subscribe_fn_t;

    WriteToFileStage(const std::string& filename,
                     std::ios::openmode mode = std::ios::out,
                     FileTypes file_type     = FileTypes::Auto,
                     bool include_index_col  = true,
                     bool flush              = false);

  private:
    void close();

    void write_json(sink_type_t& msg);

    void write_csv(sink_type_t& msg);

    void write_parquet(sink_type_t& msg);

    subscribe_fn_t build_operator();

    bool m_is_first{};
    bool m_include_index_col;
    bool m_flush;
    std::ofstream m_fstream;
    std::function<void(sink_type_t&)> m_write_func;
};

/****** WriteToFileStageInterfaceProxy******************/
struct MORPHEUS_EXPORT WriteToFileStageInterfaceProxy
{
    static std::shared_ptr<mrc::segment::Object<WriteToFileStage>> init(mrc::segment::Builder& builder,
                                                                        const std::string& name,
                                                                        const std::string& filename,
                                                                        const std::string& mode = "w",
                                                                        FileTypes file_type     = FileTypes::Auto,
                                                                        bool include_index_col  = true,
                                                                        bool flush              = false);
};  // end of group
}  // namespace morpheus