{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Using Tensorflow DALI plugin: using various readers\n", "\n", "### Overview\n", "\n", "This example shows how different readers could be used to interact with Tensorflow. It shows how flexible DALI is.\n", "\n", "The following readers are used in this example:\n", "\n", "- readers.mxnet\n", "- readers.caffe\n", "- readers.file\n", "- readers.tfrecord\n", "\n", "For details on how to use them please see other [examples](../../index.rst)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us start with defining some global constants\n", "\n", "`DALI_EXTRA_PATH` environment variable should point to the place where data from [DALI extra repository](https://github.com/NVIDIA/DALI_extra) is downloaded. Please make sure that the proper release tag is checked out." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import os.path\n", "\n", "test_data_root = os.environ[\"DALI_EXTRA_PATH\"]\n", "\n", "# MXNet RecordIO\n", "db_folder = os.path.join(test_data_root, \"db\", \"recordio/\")\n", "\n", "# Caffe LMDB\n", "lmdb_folder = os.path.join(test_data_root, \"db\", \"lmdb\")\n", "\n", "# image dir with plain jpeg files\n", "image_dir = \"../../data/images\"\n", "\n", "# TFRecord\n", "tfrecord = os.path.join(test_data_root, \"db\", \"tfrecord\", \"train\")\n", "tfrecord_idx = \"idx_files/train.idx\"\n", "tfrecord2idx_script = \"tfrecord2idx\"\n", "\n", "N = 8 # number of GPUs\n", "BATCH_SIZE = 128 # batch size per GPU\n", "ITERATIONS = 32\n", "IMAGE_SIZE = 3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create idx file by calling `tfrecord2idx` script" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from subprocess import call\n", "import os.path\n", "\n", "if not os.path.exists(\"idx_files\"):\n", " os.mkdir(\"idx_files\")\n", "\n", "if not os.path.isfile(tfrecord_idx):\n", " call([tfrecord2idx_script, tfrecord, tfrecord_idx])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us define:\n", "- common part of the processing graph, used by all pipelines" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from nvidia.dali import pipeline_def, Pipeline\n", "import nvidia.dali.fn as fn\n", "import nvidia.dali.types as types\n", "\n", "\n", "def common_pipeline(jpegs, labels):\n", " images = fn.decoders.image(jpegs, device=\"mixed\")\n", " images = fn.resize(\n", " images, resize_shorter=fn.random.uniform(range=(256, 480)), interp_type=types.INTERP_LINEAR\n", " )\n", " images = fn.crop_mirror_normalize(\n", " images,\n", " crop_pos_x=fn.random.uniform(range=(0.0, 1.0)),\n", " crop_pos_y=fn.random.uniform(range=(0.0, 1.0)),\n", " dtype=types.FLOAT,\n", " crop=(227, 227),\n", " mean=[128.0, 128.0, 128.0],\n", " std=[1.0, 1.0, 1.0],\n", " )\n", " return images, labels" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- MXNet reader pipeline" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "@pipeline_def\n", "def mxnet_reader_pipeline(num_gpus):\n", " jpegs, labels = fn.readers.mxnet(\n", " path=[db_folder + \"train.rec\"],\n", " index_path=[db_folder + \"train.idx\"],\n", " random_shuffle=True,\n", " shard_id=Pipeline.current().device_id,\n", " num_shards=num_gpus,\n", " name=\"Reader\",\n", " )\n", "\n", " return common_pipeline(jpegs, labels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- Caffe reader pipeline" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "@pipeline_def\n", "def caffe_reader_pipeline(num_gpus):\n", " jpegs, labels = fn.readers.caffe(\n", " path=lmdb_folder,\n", " random_shuffle=True,\n", " shard_id=Pipeline.current().device_id,\n", " num_shards=num_gpus,\n", " name=\"Reader\",\n", " )\n", "\n", " return common_pipeline(jpegs, labels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- File reader pipeline" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "@pipeline_def\n", "def file_reader_pipeline(num_gpus):\n", " jpegs, labels = fn.readers.file(\n", " file_root=image_dir,\n", " random_shuffle=True,\n", " shard_id=Pipeline.current().device_id,\n", " num_shards=num_gpus,\n", " name=\"Reader\",\n", " )\n", "\n", " return common_pipeline(jpegs, labels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- TFRecord reader pipeline" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "import nvidia.dali.tfrecord as tfrec\n", "\n", "\n", "@pipeline_def\n", "def tfrecord_reader_pipeline(num_gpus):\n", " inputs = fn.readers.tfrecord(\n", " path=tfrecord,\n", " index_path=tfrecord_idx,\n", " features={\n", " \"image/encoded\": tfrec.FixedLenFeature((), tfrec.string, \"\"),\n", " \"image/class/label\": tfrec.FixedLenFeature([1], tfrec.int64, -1),\n", " },\n", " random_shuffle=True,\n", " shard_id=Pipeline.current().device_id,\n", " num_shards=num_gpus,\n", " name=\"Reader\",\n", " )\n", "\n", " return common_pipeline(inputs[\"image/encoded\"], inputs[\"image/class/label\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let us create function which builds pipeline on demand:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf\n", "import nvidia.dali.plugin.tf as dali_tf\n", "\n", "from tensorflow.compat.v1 import GPUOptions\n", "from tensorflow.compat.v1 import ConfigProto\n", "from tensorflow.compat.v1 import Session\n", "from tensorflow.compat.v1 import placeholder\n", "\n", "tf.compat.v1.disable_eager_execution()\n", "\n", "\n", "def get_batch_test_dali(batch_size, pipe_type):\n", " pipe_name, label_type, _ = pipe_type\n", " pipes = [\n", " pipe_name(batch_size=BATCH_SIZE, num_threads=2, device_id=device_id, num_gpus=N)\n", " for device_id in range(N)\n", " ]\n", "\n", " daliop = dali_tf.DALIIterator()\n", " images = []\n", " labels = []\n", " for d in range(N):\n", " with tf.device(\"/gpu:%i\" % d):\n", " image, label = daliop(\n", " pipeline=pipes[d],\n", " shapes=[(BATCH_SIZE, 3, 227, 227), ()],\n", " dtypes=[tf.int32, label_type],\n", " device_id=d,\n", " )\n", " images.append(image)\n", " labels.append(label)\n", "\n", " return [images, labels]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At the end let us test if all pipelines have been correctly built and run with TF session" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "RUN: mxnet_reader_pipeline\n", "OK : mxnet_reader_pipeline\n", "RUN: caffe_reader_pipeline\n", "OK : caffe_reader_pipeline\n", "RUN: file_reader_pipeline\n", "OK : file_reader_pipeline\n", "RUN: tfrecord_reader_pipeline\n", "OK : tfrecord_reader_pipeline\n" ] } ], "source": [ "import numpy as np\n", "\n", "pipe_types = [\n", " [mxnet_reader_pipeline, tf.float32, (0, 999)],\n", " [caffe_reader_pipeline, tf.int32, (0, 999)],\n", " [file_reader_pipeline, tf.int32, (0, 1)],\n", " [tfrecord_reader_pipeline, tf.int64, (1, 1000)],\n", "]\n", "\n", "for pipe_name in pipe_types:\n", " print(\"RUN: \" + pipe_name[0].__name__)\n", " test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)\n", " x = placeholder(tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name=\"x\")\n", " gpu_options = GPUOptions(per_process_gpu_memory_fraction=0.8)\n", " config = ConfigProto(gpu_options=gpu_options)\n", "\n", " with Session(config=config) as sess:\n", " for i in range(ITERATIONS):\n", " imgs, labels = sess.run(test_batch)
            # Testing correctness of labels
            for label in labels:
                ## labels need to be integers
                assert np.equal(np.mod(label, 1), 0).all()
                ## labels need to be in range pipe_name[2]
                assert (label >= pipe_name[2][0]).all()
                assert (label <= pipe_name[2][1]).all()
        print("OK : " + pipe_name[0].__name__)