# PIPELINE DEFINITION # Name: cats-vs-dogs-classification-from-minio # Description: A pipeline to train and evaluate a cats-vs-dogs classifier using dataset from MinIO. # Inputs: # batch_size: int [Default: 32.0] # bucket_name: str [Default: 'xrwang'] # epochs: int [Default: 10.0] # local_zip_path: str [Default: '/tmp/cat-dog_data.zip'] # minio_access_key: str [Default: 'LEINAOYUNOS'] # minio_endpoint: str [Default: 'miniogw-dev2.cnbita.com:11443'] # minio_secret_key: str [Default: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYLEINAOYUNKEY'] # object_name: str [Default: 'catdog/cat-dog_data.zip'] # secure: bool [Default: True] # Outputs: # evaluate-model-metrics: system.Metrics components: #组件的输入输出类型 comp-download-and-prepare-data-from-minio: executorLabel: exec-download-and-prepare-data-from-minio inputDefinitions: parameters: bucket_name: parameterType: STRING local_zip_path: parameterType: STRING minio_access_key: parameterType: STRING minio_endpoint: parameterType: STRING minio_secret_key: parameterType: STRING object_name: parameterType: STRING secure: defaultValue: true isOptional: true parameterType: BOOLEAN outputDefinitions: artifacts: dataset_output: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 comp-evaluate-model: executorLabel: exec-evaluate-model inputDefinitions: artifacts: dataset_input: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 model_dir: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 outputDefinitions: artifacts: metrics: artifactType: schemaTitle: system.Metrics schemaVersion: 0.0.1 comp-train-model: executorLabel: exec-train-model inputDefinitions: artifacts: dataset_input: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 parameters: batch_size: parameterType: NUMBER_INTEGER epochs: parameterType: NUMBER_INTEGER outputDefinitions: artifacts: model_output: artifactType: schemaTitle: system.Model schemaVersion: 0.0.1 defaultPipelineRoot: minio://xrwang/artifacts/ deploymentSpec: executors: exec-download-and-prepare-data-from-minio: container: args: - --executor_input - "{{$}}" - --function_to_execute - download_and_prepare_data_from_minio command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'minio' 'pandas'\ \ 'tensorflow' 'pathlib' 'zipfile36' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef download_and_prepare_data_from_minio(\n dataset_output: Output[Dataset],\n\ \ minio_endpoint: str,\n minio_access_key: str,\n minio_secret_key:\ \ str,\n bucket_name: str,\n object_name: str,\n local_zip_path:\ \ str,\n secure: bool = True\n):\n from minio import Minio\n from\ \ zipfile import ZipFile\n from pathlib import Path\n import os\n\ \ client = Minio(\n minio_endpoint,\n access_key=minio_access_key,\n\ \ secret_key=minio_secret_key,\n secure=secure\n )\n \ \ print (\"download_and_prepare_data_from_minio\")\n client.fget_object(bucket_name,\ \ object_name, local_zip_path)\n with ZipFile(local_zip_path, 'r') as\ \ zip_ref:\n zip_ref.extractall(path=dataset_output.path)\n os.remove(local_zip_path)\n\ \ print(f\"Data prepared at: {dataset_output.path}\")\n\n" image: python:3.7 exec-evaluate-model: container: args: - --executor_input - "{{$}}" - --function_to_execute - evaluate_model command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'tensorflow'\ \ 'pandas' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef evaluate_model(\n model_dir: Input[Model],\n dataset_input:\ \ Input[Dataset],\n metrics: Output[Metrics]\n):\n import tensorflow\ \ as tf\n import os\n\n # \u52A0\u8F7D\u6A21\u578B\n model = tf.keras.models.load_model(model_dir.path)\n\ \n # \u52A0\u8F7D\u9A8C\u8BC1\u6570\u636E\u96C6\n validation_dir =\ \ os.path.join(dataset_input.path, 'validation')\n validation_dataset\ \ = tf.keras.utils.image_dataset_from_directory(\n validation_dir,\n\ \ image_size=(180, 180),\n batch_size=32)\n\n # \u8BC4\u4F30\ \u6A21\u578B\n eval_result = model.evaluate(validation_dataset)\n\n \ \ # \u8BB0\u5F55\u8BC4\u4F30\u7ED3\u679C\n metrics.log_metric(\"loss\"\ , eval_result[0])\n metrics.log_metric(\"accuracy\", eval_result[1])\n\ \n" image: registry.cnbita.com:5000/kubeflow-pipelines/python-tensorflow:3.7 exec-train-model: container: args: - --executor_input - "{{$}}" - --function_to_execute - train_model command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'tensorflow'\ \ 'numpy' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef train_model(\n dataset_input: Input[Dataset],\n model_output:\ \ Output[Model],\n epochs: int,\n batch_size: int\n):\n import\ \ tensorflow as tf\n import os\n train_dir = os.path.join(dataset_input.path,\ \ 'train')\n validation_dir = os.path.join(dataset_input.path, 'validation')\n\ \n train_dataset = tf.keras.utils.image_dataset_from_directory(\n \ \ train_dir,\n image_size=(180, 180),\n batch_size=batch_size)\n\ \n validation_dataset = tf.keras.utils.image_dataset_from_directory(\n\ \ validation_dir,\n image_size=(180, 180),\n batch_size=batch_size)\n\ \n model = tf.keras.Sequential([\n tf.keras.layers.Rescaling(1./255),\n\ \ tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),\n\ \ tf.keras.layers.MaxPooling2D(),\n tf.keras.layers.Conv2D(32,\ \ 3, padding='same', activation='relu'),\n tf.keras.layers.MaxPooling2D(),\n\ \ tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),\n\ \ tf.keras.layers.MaxPooling2D(),\n tf.keras.layers.Flatten(),\n\ \ tf.keras.layers.Dense(128, activation='relu'), \n tf.keras.layers.Dense(2)\n\ \n ])\n\n model.compile(optimizer='adam',\n loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n\ \ metrics=['accuracy'])\n\n model.fit(train_dataset,\ \ validation_data=validation_dataset, epochs=epochs, batch_size=batch_size)\n\ \ model.save(model_output.path)\n\n" image: registry.cnbita.com:5000/kubeflow-pipelines/python-tensorflow:3.7 pipelineInfo: description: A pipeline to train and evaluate a cats-vs-dogs classifier using dataset from MinIO. name: cats-vs-dogs-classification-from-minio root: dag: outputs: artifacts: evaluate-model-metrics: artifactSelectors: - outputArtifactKey: metrics producerSubtask: evaluate-model #·、该组件的输入参数来源哪个组件 2、组件之间的依赖 3、组件的名称(唯一标识) tasks: download-and-prepare-data-from-minio: cachingOptions: enableCache: true componentRef: name: comp-download-and-prepare-data-from-minio inputs: parameters: bucket_name: componentInputParameter: bucket_name local_zip_path: componentInputParameter: local_zip_path minio_access_key: componentInputParameter: minio_access_key minio_endpoint: componentInputParameter: minio_endpoint minio_secret_key: componentInputParameter: minio_secret_key object_name: componentInputParameter: object_name secure: componentInputParameter: secure taskInfo: name: download-and-prepare-data-from-minio evaluate-model: cachingOptions: {} #当前组件(evaluate-model)的定义来源(comp-evaluate-model) componentRef: name: comp-evaluate-model #放在哪些组件之后跑 dependentTasks: - download-and-prepare-data-from-minio - train-model # 该组件的输入参数来源哪个组件 inputs: artifacts: dataset_input: taskOutputArtifact: outputArtifactKey: dataset_output producerTask: download-and-prepare-data-from-minio model_dir: taskOutputArtifact: outputArtifactKey: model_output producerTask: train-model taskInfo: name: evaluate-model train-model: cachingOptions: {} componentRef: name: comp-train-model dependentTasks: - download-and-prepare-data-from-minio inputs: artifacts: dataset_input: taskOutputArtifact: outputArtifactKey: dataset_output producerTask: download-and-prepare-data-from-minio parameters: batch_size: componentInputParameter: batch_size epochs: componentInputParameter: epochs taskInfo: name: train-model inputDefinitions: parameters: batch_size: defaultValue: 32.0 isOptional: true parameterType: NUMBER_INTEGER bucket_name: defaultValue: xrwang isOptional: true parameterType: STRING epochs: defaultValue: 10.0 isOptional: true parameterType: NUMBER_INTEGER local_zip_path: defaultValue: /tmp/cat-dog_data.zip isOptional: true parameterType: STRING minio_access_key: defaultValue: LEINAOYUNOS isOptional: true parameterType: STRING minio_endpoint: defaultValue: miniogw-dev2.cnbita.com:11443 isOptional: true parameterType: STRING minio_secret_key: defaultValue: wJalrXUtnFEMI/K7MDENG/bPxRfiCYLEINAOYUNKEY isOptional: true parameterType: STRING object_name: defaultValue: catdog/cat-dog_data.zip isOptional: true parameterType: STRING secure: defaultValue: true isOptional: true parameterType: BOOLEAN outputDefinitions: artifacts: evaluate-model-metrics: artifactType: schemaTitle: system.Metrics schemaVersion: 0.0.1 schemaVersion: 2.1.0 sdkVersion: kfp-2.7.0