By Oleksiy Grechnyev, CV/ML engineer @It-Jim
30

What MediaPipe Really Is: a C++ Mini-Tutorial

Basic concepts

As we already explained, MediaPipe is a C++ pipeline library. It is very poorly documented, basically, the only documentation is the comments and docstrings in the MP source code. There are also examples, but they are not very readable. There is only one trivial “hello world” example, the rest is deep learning, which is counterproductive for learning basic MP concepts.  Moreover, these examples are artificially obscured by things like GLog and GFlags. So we had to learn MP the hard way while dealing with the Bazel issues.

As a result, we wrote the following tutorial: https://github.com/agrechnev/first_steps_mediapipe. It gives a gentle introduction to the basic MediaPipe C++ API (no deep learning or solutions). Below we give a very brief summary of this tutorial, see the actual code for more details.

The core MP concepts (unlike the C++ API) are pretty well explained in the official MP docs. The basic terminology:

  • Packet: An immutable data packet of an arbitrary type (with a timestamp). MP also has standard types for image and audio.
  • Graph: The pipeline, represented as a graph.
  • Node: A node of the graph, which processes data.
  • Stream: Graph edge, a stream of packets with monotonously increasing timestamps.
  • Calculator: A registered class for creating nodes.

First example

How does it work in practice? Let’s look at our first example 1.1. It deals with packets of doubles (using more complicated types, like images would be very wrong for first examples). Let’s define a very simple graph, as a Protobuf text string:

string protoG = R"(
    input_stream: "in"
    output_stream: "out"
    node {
        calculator: "PassThroughCalculator"
        input_stream: "in"
        output_stream: "out1"
    }
    node {
        calculator: "PassThroughCalculator"
        input_stream: "out1"
        output_stream: "out"
    }
    )";

It has two nodes of PassThroughCalculator. What does it do? Basically nothing, it forwards all input data packets to the output. The graph has input stream in, output stream out, and there is one more stream out1 in the middle. The graph looks like this (visualized by the MP visualizer):

Next, we parse the config and create our graph.

mediapipe::CalculatorGraphConfig config =
  mediapipe::ParseTextProtoOrDie<mediapipe::CalculatorGraphConfig>(protoG);
mediapipe::CalculatorGraph graph;
MP_RETURN_IF_ERROR(graph.Initialize(config));

Next, we should add an observer to process output packets of a graph asynchronously (synchronous processing is also possible if needed). Then we start running the graph:

auto cb = [](const mediapipe::Packet &packet)->mediapipe::Status{
  cout << packet.Timestamp() << ": RECEIVED " << packet.Get<double>() << endl;
  return mediapipe::OkStatus();
}
MP_RETURN_IF_ERROR(graph.ObserveOutputStream("out", cb));
MP_RETURN_IF_ERROR(graph.StartRun({}));

At this point, the graph starts running. It is now waiting for the input packets. But wait, we did not supply any! This is what we do next. The packet is sort of like an immutable shared_ptr<any>, plus a timestamp. It can hold data of any type. The timestamps in a stream must increase monotonously. Of course, they don’t have to be absolute timestamps since the epoch. Let’s send a few double packets, then “close the stream” to tell MP that no more packets are coming.

for (int i=0; i<13; ++i) {
  mediapipe::Timestamp ts(i);
  mediapipe::Packet packet = mediapipe::MakePacket<double>(i*0.1).At(ts);
  MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in", packet));
}
graph.CloseInputStream("in");

Adding the timestamp is crucial, MP will not work otherwise! Now let us wait for MP to process all packets and finish.

MP_RETURN_IF_ERROR(graph.WaitUntilDone());
return mediapipe::OkStatus();

That’s it, we are done!

Writing a custom calculator

Let us now write a custom calculator (example 1.2). Our calculator will multiply a double number by 2, aka “double the double”. A custom calculator must be defined in the mediapipe namespace and registered with the REGISTER_CALCULATOR() macro. After that MediaPipe finds the calculator by name (as specified in the Protobuf graph description), there is no need to import any header for the calculator class. 

Every calculator must implement the static method GetContract() to describe inputs and outputs (streams in MP can have numbers, string tags, or both); and implement the method Process() which process each incoming packet (or, in, general, a synchronized bunch of packets with the same timestamp). Methods Open() and Close() are typically also overridden. The code for “double the double” calculator is:

namespace mediapipe{
class GoblinCalculator12 : public CalculatorBase {
public:
static Status GetContract(CalculatorContract *cc) {
  using namespace std;
  cc->Inputs().Index(0).Set<double>; 	// 1 double input
  cc->Outputs().Index(0).Set<double>;	// 1 double output
  return OkStatus();                   	// Never forget to say "OK" !
}

Status Process(CalculatorContext *cc) override {
  using namespace std;
  Packet pIn = cc->Inputs().Index(0).Value();	// Receive the input packet
  double x = pIn.Get<double>();     	// Extract the double number
  double y = x * 2;                        	// Process the number
  Packet pOut = MakePacket<double>(y).At(cc->InputTimestamp()); // Create packet
  cc->Outputs().Index(0).AddPacket(pOut);  // Send it to the output stream
  return OkStatus();             	// Never forget to say "OK" !
}
REGISTER_CALCULATOR(GoblinCalculator12); 	// Register this calculator
}

Example 1.3 contains further examples of custom calculators.

Can you configure a calculator? MP gives a few ways to do that:

  1. Options: Parameter specified in the Protobuf graph definition, see example 1.4.
  2. Side packets: Input and output data packets that are sent only once (and not for each timestamp). Example 1.5.
  3. Extra stream: This can contain options for each timestamp. For example, stream 0 for video frames, and stream 1 for crop boxes of some sort. Example 2.3.

Let’s process images

Now let’s process images (and a stream of images is actually a video). MP has a special type for images, ImageFrame. It can be converted back and forth to cv::Mat. Example 2.1 is a trivial example with PassThroughCalculator, but with video data. The graph is simple:

string protoG = R"(
    	input_stream: "in",
    	output_stream: "out",
    	node {
        	calculator: "PassThroughCalculator",
        	input_stream: "in",
        	output_stream: "out",
    	}
    	)";

Our Observer callback now converts the packet to cv::Mat and displays the image on the screen.

auto cb = [](const Packet &packet)->Status{
    	cout << packet.Timestamp() << ": RECEIVED VIDEO PACKET !" << endl;
    	// Get data from packet (you should be used to this by now)
    	const ImageFrame & outputFrame = packet.Get<ImageFrame>();
    	// Represent ImageFrame data as cv::Mat (MatView is a thin wrapper, no copying)
    	cv::Mat ofMat = formats::MatView(&outputFrame);
    	// Convert RGB->BGR
    	cv::Mat frameOut;
    	cvtColor(ofMat, frameOut, cv::COLOR_RGB2BGR);
    	// Display frame on screen and quit on ESC
    	// Returning non-OK status aborts graph execution
    	// I'll make a nicer quit in later examples
    	cv::imshow("frameOut", frameOut);
    	if (27 == cv::waitKey(1))
        	// I was not sure which Abseil error to use here ...
        	return absl::CancelledError("It's time to QUIT !");
    	else
        	return OkStatus();
	};

Note that we return an error code for a smooth quit from the application if the ESC key is pressed. If the Observer callback returns an error, the whole MP graph stops.

Now we take frames from the camera, convert them to ImageFrame, and send them to MP in an endless loop, which we break out of on a failed MP_RETURN_IF_ERROR() check:

for (int i=0; ; ++i){
    	// Read next frame from camera
    	cap.read(frameIn);
    	if (frameIn.empty())
        	return absl::NotFoundError("CANNOT OPEN CAMERA !");
    	// Convert BGR to RGB
    	cv::cvtColor(frameIn, frameInRGB, cv::COLOR_BGR2RGB);
    	// Create an empty RGB ImageFrame with the same size as our image
    	ImageFrame *inputFrame =  new ImageFrame(
        	ImageFormat::SRGB, frameInRGB.cols, frameInRGB.rows, ImageFrame::kDefaultAlignmentBoundary
    	);
    	// Copy data from cv::Mat to Imageframe, using
    	// MatView: a cv::Mat representation of ImageFrame
    	frameInRGB.copyTo(formats::MatView(inputFrame));
    	// Create and send a video packet
    	uint64 ts = i;
    	// Adopt() creates a new packet from a raw pointer, and takes this pointer under MP management
    	MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in",
        	Adopt(inputFrame).At(Timestamp(ts))
    	));
	}

Our further video examples:

  • 2.2: Video pipeline with ImageCroppingCalculator and ScaleImageCalculator
  • 2.3: Video pipeline with ImageCroppingCalculator (dynamic crop)
  • 2.4: Video pipeline with FeatureDetectorCalculator and custom image processing. Here we write a custom calculator for processing images.

ImageCroppingCalculator, ScaleImageCalculator and FeatureDetectorCalculator are three standard image-processing calculators of MediaPipe. There are many more.

How to make MediaPipe real-time?

By default, MP is NOT real-time. It processes all packets deterministically, in the order of increasing timestamps, without loosing any packets. Any MP stream automatically has a buffer of unlimited size. This is fine if we want to process a video file offline.

As we all know, it is NOT acceptable for real-time pipelines. If we set up a real-time source of packets, and the pipeline is not fast enough to process them, the buffers will fill more and more, while increasing the lag, until they fill all RAM and your application crashes (Example 3.1). 

Is it possible to create a real-time pipeline in MP? Yes. There are several ways. The simplest (and used in Google deep learning examples) is to put a FlowLimiterCalculator at the beginning of the pipeline. This calculator has a second input stream, which should be plugged into the output stream of the pipeline. It then compares the timestamps of two streams. If they are too different, it means that the buffers start to fill up, and, above a certain threshold (which can be adjusted), FlowLimiterCalculator starts to drop packets. A typical pipeline from the Google face detection example is (output video is actually sent to the “FINISHED” input of FlowLimiter, but the visualizer does not show such connections):

The right panel shows the subgraph FaceDetectionFrontCpu, which is a typical TFLite inference pipeline.

Our example 3.2 demonstrates the use of FlowLimiter.

What next?

MediaPipe has the following modules, each with a number of standard calculators:

  • audio
  • core
  • image
  • tensor
  • tensorflow
  • tflite
  • util
  • video

In our tutorial we focused on the basic MP concepts, there are lots of things we did not cover:

  • We barely touched the standard calculators
  • Using GPU
  • Audio processing
  • Deep learning with TFLite or TensorFlow
  • Solutions
  • Input policies
  • Languages and OSes other than C++/Desktop

And we repeat our final verdict: MediaPipe would be very nice, if not for Bazel. Bazel (and all related issues) makes you think twice before deciding to use MediaPipe in your C++ project.

A C++ Mini-Tutorial on MediaPipe
Tagged on: