gst-launch를 사용하여 커맨드라인으로 원하는 파이프라인을 실행시킬 수도 있지만, GStreamer API 구성 요소를 사용하여 명령줄로 구현했던 파이프라인의 기능을 똑같이 동작시키는 하나의 응용 어플리케이션 형태로도 만들수 있다.

GStreamer는 기본적으로 GLib의 GObject를 기반으로 설계되었기 때문에 해당 개념을 어느정도 이해하고 있어야 GStreamer를 능숙하게 다룰 수 있지만, 이는 단순히 기능을 사용하는것이 스케줄링, 동기화등의 고급 기능 혹은 플러그인을 제작하려고 할 때 중요하게 사용되는 개념이므로 간단한 예제를 이해하는데는 크게 중요하지 않다.

color bars 테스트 영상을 띄우기 위해 어플리케이션내에서 2가지 방법으로 파이프라인을 구축해볼것이다

gst-launch-1.0 videotestsrc pattern=0 ! autovideosink

string기반 파이프라인

#include <gst/gst.h>
#include <iostream>

int main(int argc, char *argv[]) {
  GstElement *pipeline;
  GstBus *bus;
  GstMessage *msg;

  gst_init (&argc, &argv);

  pipeline = gst_parse_launch("videotestsrc pattern=0 ! autovideosink", NULL);

  gst_element_set_state (pipeline, GST_STATE_PLAYING);

  bus = gst_element_get_bus (pipeline);
  msg = gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE, GST_MESSAGE_ERROR | GST_MESSAGE_EOS);

  if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) {
    std::cerr << "An error occurred! Re-run with the GST_DEBUG=*:WARN environment variable set for more details\n";
  }

  gst_message_unref (msg);
  gst_object_unref (bus);
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (pipeline);

  return 0;
}

GStreamer를 어플리케이션에서 사용하는 과정을 요약하면 아래와 같다.

  1. GStreamer API 초기화
  2. 파이프라인 정의
  3. 파이프라인 시작

GStreamer의 기능들을 사용하기전 gst_init()을 호출하여 초기화하고, 플러그인들을 로드한다. 기본적으로 초기화 실패시 프로그램이 종료되므로 초기화하지 못하는 경우를 fallback하려면 gst_init_check()를 호출하여 초기화 여부를 알아낼 수 있다.

다음은 파이프라인을 정의하는 부분이다. GStreamer 파이프라인을 어플리케이션에서 구축하는 방법은 크게 2가지가 있다.

  1. string기반으로 파이프라인 구축
  2. 개별 element를 각각 조합하여 구축

위 예제는 간단한 파이프라인의 구조를 사용하고 별도의 고급기능을 사옹하지 않기 때문에 string기반으로 파이프라인을 구축한다. 이는 gst_parse_launch라는 함수를 이용하여 수행된다.

사용하는 방법은 CLI도구인 gst-launch-1.0을 사용하는 방법과 똑같다. 다만 string기반으로 파이프라인을 구축할 경우 어플리케이션 내에서 특정 element의 property를 바꾸고싶을 때 해당 element의 name property로 접근하여 pipeline에서 분리해내는 과정이 필요하다.

다음은 파이프라인을 시작시키는 단계로 element의 상태를 변경함으로써 수행된다. pipeline도 하나의 GstElement이기때문에 gst_element_set_state를 통해 GST_STATE_PLAYING을 전달하여 파이프라인을 실행시킨다.

아래의 명령어를 통해 컴파일을 하고 실행시키면 color bars가 재생되는것을 볼 수 있다.

gcc basic-tutorial-1.c -o basic-tutorial-1 $(pkg-config --cflags --libs gstreamer-1.0)
Unacceptable TLS certificate

실행시켰는데 위 처럼 TLS 관련 에러가 발생하면 인증서 문제가 발생한 것이므로 아래 명령어를 통해 인증서를 설치 후 해결할 수 있다.

sudo apt-get install ca-certificates

조립형 파이프라인

이번에는 문자열기반의 파이프라인이 아닌 개별 요소를 직접 연결하는 방식으로 파이프라인을 구축해볼 것이다.

#include <gst/gst.h>
#include <iostream>

int main(int argc, char *argv[]) {
  GstElement *pipeline, *source, *sink;
  GstBus *bus;
  GstMessage *msg;
  GstStateChangeReturn ret;

  gst_init (&argc, &argv);

  source = gst_element_factory_make ("videotestsrc", "source");
  sink = gst_element_factory_make ("autovideosink", "sink");

  pipeline = gst_pipeline_new ("test-pipeline");

  if (!pipeline || !source || !sink) {
    std::cerr << "Not all elements could be created.\n";
    return -1;
  }

  gst_bin_add_many (GST_BIN (pipeline), source, sink, NULL);
  if (gst_element_link (source, sink) != TRUE) {
    std::cerr << "Elements could not be linked.\n";
    gst_object_unref (pipeline);
    return -1;
  }

  g_object_set (source, "pattern", 0, NULL);

  ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE) {
    std::cerr <<"Unable to set the pipeline to the playing state.\n";
    gst_object_unref (pipeline);
    return -1;
  }

  bus = gst_element_get_bus (pipeline);
  msg = gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE, GST_MESSAGE_ERROR | GST_MESSAGE_EOS);

  if (msg != NULL) {
    GError *err;
    gchar *debug_info;

    switch (GST_MESSAGE_TYPE (msg)) {
      case GST_MESSAGE_ERROR:
        gst_message_parse_error (msg, &err, &debug_info);
        g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
        g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
        g_clear_error (&err);
        g_free (debug_info);
        break;
      case GST_MESSAGE_EOS:
        g_print ("End-Of-Stream reached.\n");
        break;
      default:
        /* We should not reach here because we only asked for ERRORs and EOS */
        g_printerr ("Unexpected message received.\n");
        break;
    }
    gst_message_unref (msg);
  }

  gst_object_unref (bus);
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (pipeline);
  return 0;
}

위의 예제와 다르게 string으로 전체 파이프라인을 구축하지 않고 개별의 element를 만들고 각 요소를 연결하여 파이프라인을 생성하고 있다.

일반적으로 element는 gst_element_factory_make를 사용하여 생성한다. 첫 번째 파라미터로 생성할 element의 고유한 이름을, 두 번째 파라미터로 별도의 name을 정해줄 수 있는데, 나중에 전체 파이프라인에서 원하는 element를 찾거나 할 때 name이 사용된다.

gst_pipeline_new를 사용해 빈 파이프라인을 생성하고 gst_bin_add_many를 호출하여 각 element를 파이프라인에 추가한다. 그리고 gst_element_link를 최종적으로 호출하여 element를 연결해주는 작업을 해준다. 이때 중요한 점은 data flow에 따라 link 순서를 설정해주어야 한다. 아래 그림처럼 source에서 sink의 흐름이 되어야한다.

GstElement는 GObject를 상속하기때문에 g_object_set을 호출하여 각 element의 속성을 수정할 수 있다. NULL을 종료 문자열로 인식하므로 여러 속성을 한번에 정의할 수도 있다.

첫 번째 예제에서는 gst_parse_launch를 통해 간단하게 파이프라인을 생성한 반면에 두 번째 방법은 다소 번거로운 작업처럼 보인다. 하지만 프로젝트 규모나 상황에 따라 두 번째 방법처럼 개별 요소를 각각 정의하는 방법이 구조화된 측면에서는 더욱 좋은 방법이라고 생각이 든다.

Producer, Consumer

단순히 파이프라인을 실행시키는것 말고 source로부터 흐르는 video, audio 데이터들을 직접 다루려면 어떻게 해야할까? 예를들어 특정 영상 파일 혹은 rtsp 소스로부터 비디오 스트림을 받고 OpenCV로 객체 검출을 하고 싶다고 가정해보자. 그러기위해서는 파이프라인으로부터 각 프레임마다 이미지 바이트를 전달받을 수 있어야한다.

이 같은 상황을 해결하기 위해 GStreamer는 appsrc, appsink 플러그인을 제공한다. appsrc는 파이프라인에 데이터를 전달해주기 위해, appsink는 반대로 파이프라인에서 application에 데이터를 전달해주는 역할을 한다.

이번에는 color bars예제를 appsrc, appsink를 이용해 producer, consumer 관점으로 만들어볼 것이다.

producer

생산자는 source입장에서 파일을 읽거나 (filesrc), IP카메라로부터 영상을 받는(rtspsrc) 등의 특정 영상을 생성해서 application으로 각 이미지를 전달하는 역할을 한다.

Producer::Producer() {
  GError *err{nullptr};
  std::string pipelineStr = "videotestsrc ! video/x-raw, width=1280, height=720, format=RGB ! videoconvert ! appsink name=mysink";
  std::cout << "producer pipeline: " << pipelineStr << std::endl;

  m_pipeline = gst_parse_launch(pipelineStr.c_str(), &err);
  if (err != nullptr) {
    std::cerr << "failed to construct pipeline (err: " << err->message << ")\n";
    exit(1);
  }

  GstPipeline *pl = GST_PIPELINE(m_pipeline);
  if (!pl) {
    std::cerr << "failed to construct pipeline (err: " << err->message << ")\n";
    exit(1);
  }

  m_appsink = GST_APP_SINK(gst_bin_get_by_name(GST_BIN(m_pipeline), "mysink"));
}
void Producer::produce(unsigned char **dataPtr) {
  GstSample *sample;
  GstBuffer *buffer;
  GstMapInfo map;

  sample = gst_app_sink_pull_sample(GST_APP_SINK(m_appsink));
  if (!sample) {
    return;
  }

  buffer = gst_sample_get_buffer(sample);
  if (!buffer) {
    return;
  }

  if (!gst_buffer_map(buffer, &map, GST_MAP_READ)) {
    return;
  }

  const void *gstData = map.data;
  const gsize gstSize = map.size;
  auto size = gstSize;
  if (!gstData) {
    gst_sample_unref(sample);
    return;
  }

  memcpy(*dataPtr, gstData, size);

  // free gst buffer
  gst_buffer_unmap(buffer, &map);
  gst_sample_unref(sample);

  return;
}

consumer

소비자는 각 처리된 이미지 스트림을 파이프라인으로 전달하여 파일로 쓰거나 혹은 RTSP 스트림으로 전달하는 등의 sink 역할을 한다.

Consumer::Consumer(int size) : m_size(size) {
  GError *err{nullptr};
  std::string pipelineStr = "appsrc name=mysrc ! videoconvert ! autovideosink";
  //std::string pipelineStr = "appsrc name=mysrc ! videoconvert  ! x264enc ! qtmux ! filesink location=test.mp4";
  std::cout << "consumer pipeline: " << pipelineStr << std::endl;

  m_pipeline = gst_parse_launch(pipelineStr.c_str(), &err);
  if (err != nullptr) {
    std::cerr << "failed to construct pipeline (err: " << err->message << ")\n";
    exit(1);
  }

  GstPipeline *pl = GST_PIPELINE(m_pipeline);
  if (!pl) {
    std::cerr << "failed to construct pipeline (err: " << err->message << ")\n";
    exit(1);
  }
  m_appsrc = GST_APP_SRC(gst_bin_get_by_name(GST_BIN(m_pipeline), "mysrc"));

  std::string caps("video/x-raw, width=1280, height=720, format=RGB");

  // set the capability on the appsrc 
  gst_app_src_set_caps(GST_APP_SRC(m_appsrc), gst_caps_from_string(caps.c_str()));
}
void Consumer::consume(unsigned char **dataPtr) {
  GstFlowReturn fret;

  GstBuffer *gstBuffer = gst_buffer_new_allocate(nullptr, m_size, nullptr);

  // map buffer for writing (image)
  GstMapInfo map;
  if (gst_buffer_map(gstBuffer, &map, GST_MAP_WRITE)) {
    if (map.size != m_size) {
      gst_buffer_unref(gstBuffer);
      return;
    }
    memcpy(map.data, *dataPtr, m_size);
    gst_buffer_unmap(gstBuffer, &map);
  } else {
    gst_buffer_unref(gstBuffer);
    return;
  }

  g_signal_emit_by_name(m_appsrc, "push-buffer", gstBuffer, &fret);
  gst_buffer_unref(gstBuffer);
  if (fret != GST_FLOW_OK) {
    std::cerr << "video appsrc pushed buffer abnormally (result: " << gst_flow_get_name(fret) << ")\n";
  }

  return;
}

전체 코드는 여기에서 확인할 수 있다.

Reference