flinkcdc mysql DataStream API问题错在哪?

flinkcdc版本是2.1.1, mysql:5.7. flink:1.15.1

照着官方的葫芦画个瓢都IDEAJ没反应,在flink上跑jar提示:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

java代码:

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;

import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.functions.JoinFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

import org.example.utils.TransformUtil;

import org.apache.flink.streaming.api.windowing.time.Time;

public class ApplyFundExecutives {

public static void main(String[] args) {

Properties debeziumProperties = new Properties();

debeziumProperties.put("snapshot.locking.mode", "none");

MySqlSource<String> applyFundSource = MySqlSource.<String>builder()

.hostname("127.0.0.1")

.port(33061)

.databaseList("已存在的数据") // set captured database

.tableList("某张表") // set captured table

.username("root")

.password("root")

.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to String

.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置 3s 的 checkpoint 间隔

// checkpoint every 3000 milliseconds

env.enableCheckpointing(3000);

env.fromSource(

applyFundSource,

WatermarkStrategy.noWatermarks(),

"MySQL ApplyFund Source").setParallelism(4).print().setParallelism(1);

try {

env.execute("Print MySQL Snapshot + Binlog");

}catch (Exception e){

e.printStackTrace();

}

}

}

pom:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>

<artifactId>etl-table</artifactId>

<version>1.0-SNAPSHOT</version>

<repositories>

<repository>

<id>alibaba</id>

<name>ali Repository</name>

<url>https://maven.aliyun.com/repository/public</url>

</repository>

</repositories>

<properties>

<maven.compiler.source>8</maven.compiler.source>

<maven.compiler.target>8</maven.compiler.target>

</properties>

<dependencies>

<!-- source相关的jar-->

<dependency>

<groupId>com.ververica</groupId>

<artifactId>flink-connector-mysql-cdc</artifactId>

<version>2.1.1</version>

</dependency>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>8.0.21</version>

</dependency>

<!-- Flink Connectors :Stream || DataSet || Table -->

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java</artifactId>

<version>1.15.1</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>1.15.1</version>

</dependency>

<!-- sink相关的jar-->

<!-- https://docs.microsoft.com/zh-cn/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15#using-the-jdbc-driver-with-maven-central -->

<dependency>

<groupId>com.microsoft.sqlserver</groupId>

<artifactId>mssql-jdbc</artifactId>

<version>10.2.0.jre8</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-jdbc</artifactId>

<version>1.15.1</version>

</dependency>

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.80</version>

</dependency>

<!-- CNFE:org.apache.flink.connector.base.source.reader.RecordEmitter-->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-api-scala-bridge_2.12</artifactId>

<version>1.15.1</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner_2.12</artifactId>

<version>1.15.1</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients</artifactId>

<version>1.15.1</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-base</artifactId>

<version>1.15.1</version>

</dependency>

</dependencies>

</project>


回答:

找到问题了:
1)版本不匹配: flink的版本要与使用的flinkcdc声明的一样。我使用的是2.1.1对应的flink版本是1.13.6
2)相关jar: 把IDE中依赖的非flink的jar放到flink的lib下:%FLINK_HOME%/lib. 这些除了驱动外flinkcdc依赖的版本也是不一样的.不可以混用

以上是 flinkcdc mysql DataStream API问题错在哪? 的全部内容, 来源链接: utcz.com/p/944615.html

回到顶部