DataHub
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[datahub]'
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description | 
|---|---|
| commit_state_interval integer | Number of records to process before committing state Default: 1000 | 
| commit_with_parse_errors boolean | Whether to update createdon timestamp and kafka offset despite parse errors. Enable if you want to ignore the errors. Default: False | 
| include_all_versions boolean | If enabled, include all versions of each aspect. Otherwise, only include the latest version of each aspect. Default: False | 
| kafka_topic_name string | Name of kafka topic containing timeseries MCLs Default: MetadataChangeLog_Timeseries_v1 | 
| mysql_batch_size integer | Number of records to fetch from MySQL at a time Default: 10000 | 
| mysql_table_name string | Name of MySQL table containing all versioned aspects Default: metadata_aspect_v2 | 
| kafka_connection KafkaConsumerConnectionConfig | Kafka connection config Default: {'bootstrap': 'localhost:9092', 'schema_registry_u... | 
| kafka_connection.bootstrap string | Default: localhost:9092 | 
| kafka_connection.client_timeout_seconds integer | The request timeout used when interacting with the Kafka APIs. Default: 60 | 
| kafka_connection.consumer_config object | Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md . | 
| kafka_connection.schema_registry_config object | Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient | 
| kafka_connection.schema_registry_url string | Default: http://localhost:8080/schema-registry/api/ | 
| mysql_connection MySQLConnectionConfig | MySQL connection config Default: {'username': None, 'host_port': 'localhost:3306', ... | 
| mysql_connection.database string | database (catalog) | 
| mysql_connection.database_alias string | [Deprecated] Alias to apply to database when ingesting. | 
| mysql_connection.host_port string | MySQL host URL. Default: localhost:3306 | 
| mysql_connection.options object | Any options specified here will be passed to SQLAlchemy.create_engine as kwargs. | 
| mysql_connection.password string(password) | password | 
| mysql_connection.scheme string | Default: mysql+pymysql | 
| mysql_connection.sqlalchemy_uri string | URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters. | 
| mysql_connection.username string | username | 
| stateful_ingestion StatefulIngestionConfig | Stateful Ingestion Config Default: {'enabled': True, 'max_checkpoint_state_size': 167... | 
| stateful_ingestion.enabled boolean | The type of the ingestion state provider registered with datahub. Default: False | 
The JSONSchema for this configuration is inlined below.
{
  "title": "DataHubSourceConfig",
  "description": "Base configuration class for stateful ingestion for source configs to inherit from.",
  "type": "object",
  "properties": {
    "stateful_ingestion": {
      "title": "Stateful Ingestion",
      "description": "Stateful Ingestion Config",
      "default": {
        "enabled": true,
        "max_checkpoint_state_size": 16777216,
        "state_provider": {
          "type": "datahub",
          "config": null
        },
        "ignore_old_state": false,
        "ignore_new_state": false
      },
      "allOf": [
        {
          "$ref": "#/definitions/StatefulIngestionConfig"
        }
      ]
    },
    "mysql_connection": {
      "title": "Mysql Connection",
      "description": "MySQL connection config",
      "default": {
        "username": null,
        "host_port": "localhost:3306",
        "database": null,
        "database_alias": null,
        "scheme": "mysql+pymysql",
        "sqlalchemy_uri": null,
        "options": {}
      },
      "allOf": [
        {
          "$ref": "#/definitions/MySQLConnectionConfig"
        }
      ]
    },
    "kafka_connection": {
      "title": "Kafka Connection",
      "description": "Kafka connection config",
      "default": {
        "bootstrap": "localhost:9092",
        "schema_registry_url": "http://localhost:8080/schema-registry/api/",
        "schema_registry_config": {},
        "client_timeout_seconds": 60,
        "consumer_config": {}
      },
      "allOf": [
        {
          "$ref": "#/definitions/KafkaConsumerConnectionConfig"
        }
      ]
    },
    "include_all_versions": {
      "title": "Include All Versions",
      "description": "If enabled, include all versions of each aspect. Otherwise, only include the latest version of each aspect.",
      "default": false,
      "type": "boolean"
    },
    "mysql_batch_size": {
      "title": "Mysql Batch Size",
      "description": "Number of records to fetch from MySQL at a time",
      "default": 10000,
      "type": "integer"
    },
    "mysql_table_name": {
      "title": "Mysql Table Name",
      "description": "Name of MySQL table containing all versioned aspects",
      "default": "metadata_aspect_v2",
      "type": "string"
    },
    "kafka_topic_name": {
      "title": "Kafka Topic Name",
      "description": "Name of kafka topic containing timeseries MCLs",
      "default": "MetadataChangeLog_Timeseries_v1",
      "type": "string"
    },
    "commit_state_interval": {
      "title": "Commit State Interval",
      "description": "Number of records to process before committing state",
      "default": 1000,
      "type": "integer"
    },
    "commit_with_parse_errors": {
      "title": "Commit With Parse Errors",
      "description": "Whether to update createdon timestamp and kafka offset despite parse errors. Enable if you want to ignore the errors.",
      "default": false,
      "type": "boolean"
    }
  },
  "definitions": {
    "DynamicTypedStateProviderConfig": {
      "title": "DynamicTypedStateProviderConfig",
      "type": "object",
      "properties": {
        "type": {
          "title": "Type",
          "description": "The type of the state provider to use. For DataHub use `datahub`",
          "type": "string"
        },
        "config": {
          "title": "Config",
          "description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "StatefulIngestionConfig": {
      "title": "StatefulIngestionConfig",
      "description": "Basic Stateful Ingestion Specific Configuration for any source.",
      "type": "object",
      "properties": {
        "enabled": {
          "title": "Enabled",
          "description": "The type of the ingestion state provider registered with datahub.",
          "default": false,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "MySQLConnectionConfig": {
      "title": "MySQLConnectionConfig",
      "type": "object",
      "properties": {
        "username": {
          "title": "Username",
          "description": "username",
          "type": "string"
        },
        "password": {
          "title": "Password",
          "description": "password",
          "type": "string",
          "writeOnly": true,
          "format": "password"
        },
        "host_port": {
          "title": "Host Port",
          "description": "MySQL host URL.",
          "default": "localhost:3306",
          "type": "string"
        },
        "database": {
          "title": "Database",
          "description": "database (catalog)",
          "type": "string"
        },
        "database_alias": {
          "title": "Database Alias",
          "description": "[Deprecated] Alias to apply to database when ingesting.",
          "type": "string"
        },
        "scheme": {
          "title": "Scheme",
          "default": "mysql+pymysql",
          "type": "string"
        },
        "sqlalchemy_uri": {
          "title": "Sqlalchemy Uri",
          "description": "URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.",
          "type": "string"
        },
        "options": {
          "title": "Options",
          "description": "Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
          "type": "object"
        }
      },
      "additionalProperties": false
    },
    "KafkaConsumerConnectionConfig": {
      "title": "KafkaConsumerConnectionConfig",
      "description": "Configuration class for holding connectivity information for Kafka consumers",
      "type": "object",
      "properties": {
        "bootstrap": {
          "title": "Bootstrap",
          "default": "localhost:9092",
          "type": "string"
        },
        "schema_registry_url": {
          "title": "Schema Registry Url",
          "default": "http://localhost:8080/schema-registry/api/",
          "type": "string"
        },
        "schema_registry_config": {
          "title": "Schema Registry Config",
          "description": "Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient",
          "type": "object"
        },
        "client_timeout_seconds": {
          "title": "Client Timeout Seconds",
          "description": "The request timeout used when interacting with the Kafka APIs.",
          "default": 60,
          "type": "integer"
        },
        "consumer_config": {
          "title": "Consumer Config",
          "description": "Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
          "type": "object"
        }
      },
      "additionalProperties": false
    }
  }
}
Code Coordinates
- Class Name: datahub.ingestion.source.datahub.datahub_source.DataHubSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DataHub, feel free to ping us on our Slack.
Is this page helpful?